You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/01/06 07:25:30 UTC

[incubator-pegasus] branch master updated: feat(online_migration): part3 - support ingestion_behind for pegasus (#862)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new fa0ec7a  feat(online_migration): part3 - support ingestion_behind for pegasus (#862)
fa0ec7a is described below

commit fa0ec7aa6e16f79313e7679c202eae5f5a3280fc
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Thu Jan 6 15:25:22 2022 +0800

    feat(online_migration): part3 - support ingestion_behind for pegasus (#862)
---
 rdsn                                      |  2 +-
 src/base/pegasus_const.cpp                |  2 ++
 src/base/pegasus_const.h                  |  2 ++
 src/server/pegasus_server_impl.cpp        | 18 ++++++++++
 src/server/pegasus_server_impl.h          |  2 ++
 src/server/pegasus_server_write.cpp       |  2 +-
 src/server/pegasus_write_service.cpp      |  8 ++---
 src/server/pegasus_write_service.h        |  6 ++--
 src/server/pegasus_write_service_impl.h   | 10 +++---
 src/server/rocksdb_wrapper.cpp            |  5 ++-
 src/server/rocksdb_wrapper.h              |  4 ++-
 src/shell/commands/bulk_load.cpp          |  9 +++--
 src/shell/main.cpp                        |  2 +-
 src/test/function_test/test_bulk_load.cpp | 60 +++++++++++++++++++++++++++++--
 14 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/rdsn b/rdsn
index 982c2bd..93fe4c4 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 982c2bd180a081fe2c772191cad0ea279e0e2685
+Subproject commit 93fe4c4f4cfd8d85bf0925dccc398e3eec9a8999
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index f8127ba..c93c0ba 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -99,4 +99,6 @@ const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partitio
 const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
 
 const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
+
+const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
 } // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index bbe4193..c2a47ce 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -70,4 +70,6 @@ extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
 extern const std::string USER_SPECIFIED_COMPACTION;
 
 extern const std::string READ_SIZE_THROTTLING;
+
+extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
 } // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index a0fa94c..72a2448 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1487,11 +1487,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
             // only be initialized with default values when calling 'LoadLatestOptions', see
             // 'rocksdb/utilities/options_util.h'.
             reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
+            _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
         }
     } else {
         // When create new DB, we have to create a new column family to store meta data (meta column
         // family).
         _db_opts.create_missing_column_families = true;
+        _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
     }
 
     std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
@@ -2695,6 +2697,22 @@ void pegasus_server_impl::update_user_specified_compaction(
     }
 }
 
+bool pegasus_server_impl::parse_allow_ingest_behind(const std::map<std::string, std::string> &envs)
+{
+    bool allow_ingest_behind = false;
+    const auto &iter = envs.find(ROCKSDB_ALLOW_INGEST_BEHIND);
+    if (iter == envs.end()) {
+        return allow_ingest_behind;
+    }
+    if (!dsn::buf2bool(iter->second, allow_ingest_behind)) {
+        dwarn_replica(
+            "{}={} is invalid, set allow_ingest_behind = false", iter->first, iter->second);
+        return false;
+    }
+    ddebug_replica("update allow_ingest_behind = {}", allow_ingest_behind);
+    return allow_ingest_behind;
+}
+
 bool pegasus_server_impl::parse_compression_types(
     const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
 {
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 37ff805..c85e807 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -284,6 +284,8 @@ private:
 
     void update_throttling_controller(const std::map<std::string, std::string> &envs);
 
+    bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);
+
     // return true if parse compression types 'config' success, otherwise return false.
     // 'compression_per_level' will not be changed if parse failed.
     bool parse_compression_types(const std::string &config,
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 804294c..8501bd1 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -195,7 +195,7 @@ void pegasus_server_write::init_non_batch_write_handlers()
         {dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
          [this](dsn::message_ex *request) -> int {
              auto rpc = ingestion_rpc::auto_reply(request);
-             return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
+             return _write_svc->ingest_files(_decree, rpc.request(), rpc.response());
          }},
     };
 }
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index d31dc0e..7658cad 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -373,9 +373,9 @@ int pegasus_write_service::duplicate(int64_t decree,
     return empty_put(ctx.decree);
 }
 
-int pegasus_write_service::ingestion_files(int64_t decree,
-                                           const dsn::replication::ingestion_request &req,
-                                           dsn::replication::ingestion_response &resp)
+int pegasus_write_service::ingest_files(int64_t decree,
+                                        const dsn::replication::ingestion_request &req,
+                                        dsn::replication::ingestion_response &resp)
 {
     // TODO(heyuchen): consider cu
 
@@ -391,7 +391,7 @@ int pegasus_write_service::ingestion_files(int64_t decree,
     _server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
     dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
         dsn::error_code err =
-            _impl->ingestion_files(decree, _server->bulk_load_dir(), req.metadata);
+            _impl->ingest_files(decree, _server->bulk_load_dir(), req.metadata, req.ingest_behind);
         if (err == dsn::ERR_OK) {
             _server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED);
         } else {
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 6bc4de7..badbd12 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -142,9 +142,9 @@ public:
                   dsn::apps::duplicate_response &resp);
 
     // Execute bulk load ingestion
-    int ingestion_files(int64_t decree,
-                        const dsn::replication::ingestion_request &req,
-                        dsn::replication::ingestion_response &resp);
+    int ingest_files(int64_t decree,
+                     const dsn::replication::ingestion_request &req,
+                     dsn::replication::ingestion_response &resp);
 
     /// For batch write.
 
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index d0278dc..6f96763 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -483,9 +483,10 @@ public:
     // \return ERR_WRONG_CHECKSUM: verify files failed
     // \return ERR_INGESTION_FAILED: rocksdb ingestion failed
     // \return ERR_OK: rocksdb ingestion succeed
-    dsn::error_code ingestion_files(const int64_t decree,
-                                    const std::string &bulk_load_dir,
-                                    const dsn::replication::bulk_load_metadata &metadata)
+    dsn::error_code ingest_files(const int64_t decree,
+                                 const std::string &bulk_load_dir,
+                                 const dsn::replication::bulk_load_metadata &metadata,
+                                 const bool ingest_behind)
     {
         // verify external files before ingestion
         std::vector<std::string> sst_file_list;
@@ -495,7 +496,8 @@ public:
         }
 
         // ingest external files
-        if (dsn_unlikely(_rocksdb_wrapper->ingestion_files(decree, sst_file_list) != 0)) {
+        if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, ingest_behind) !=
+                         0)) {
             return dsn::ERR_INGESTION_FAILED;
         }
         return dsn::ERR_OK;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index a6db999..ed52631 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -180,10 +180,13 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key
 
 void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
 
-int rocksdb_wrapper::ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list)
+int rocksdb_wrapper::ingest_files(int64_t decree,
+                                  const std::vector<std::string> &sst_file_list,
+                                  const bool ingest_behind)
 {
     rocksdb::IngestExternalFileOptions ifo;
     ifo.move_files = true;
+    ifo.ingest_behind = ingest_behind;
     rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo);
     if (dsn_unlikely(!s.ok())) {
         derror_rocksdb("IngestExternalFile", s.ToString(), "decree = {}", decree);
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index b29bb7d..4f8abdf 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -64,7 +64,9 @@ public:
     int write(int64_t decree);
     int write_batch_delete(int64_t decree, dsn::string_view raw_key);
     void clear_up_write_batch();
-    int ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list);
+    int ingest_files(int64_t decree,
+                     const std::vector<std::string> &sst_file_list,
+                     const bool ingest_behind);
 
     void set_default_ttl(uint32_t ttl);
 
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index c55a150..6da7c44 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -25,17 +25,19 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
                                            {"cluster_name", required_argument, 0, 'c'},
                                            {"file_provider_type", required_argument, 0, 'p'},
                                            {"root_path", required_argument, 0, 'r'},
+                                           {"ingest_behind", no_argument, 0, 'i'},
                                            {0, 0, 0, 0}};
     std::string app_name;
     std::string cluster_name;
     std::string file_provider_type;
     std::string remote_root_path;
+    bool ingest_behind = false;
 
     optind = 0;
     while (true) {
         int option_index = 0;
         int c;
-        c = getopt_long(args.argc, args.argv, "a:c:p:r:", long_options, &option_index);
+        c = getopt_long(args.argc, args.argv, "a:c:p:r:i", long_options, &option_index);
         if (c == -1)
             break;
         switch (c) {
@@ -51,6 +53,9 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
         case 'r':
             remote_root_path = optarg;
             break;
+        case 'i':
+            ingest_behind = true;
+            break;
         default:
             return false;
         }
@@ -76,7 +81,7 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
     }
 
     auto err_resp = sc->ddl_client->start_bulk_load(
-        app_name, cluster_name, file_provider_type, remote_root_path);
+        app_name, cluster_name, file_provider_type, remote_root_path, ingest_behind);
     dsn::error_s err = err_resp.get_error();
     std::string hint_msg;
     if (err.is_ok()) {
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 1b05ef1..8be8555 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -462,7 +462,7 @@ static command_executor commands[] = {
         "start_bulk_load",
         "start app bulk load",
         "<-a --app_name str> <-c --cluster_name str> <-p --file_provider_type str> <-r "
-        "--root_path>",
+        "--root_path> [-i --ingest_behind]",
         start_bulk_load,
     },
     {
diff --git a/src/test/function_test/test_bulk_load.cpp b/src/test/function_test/test_bulk_load.cpp
index 7f5c11b..37bec7b 100644
--- a/src/test/function_test/test_bulk_load.cpp
+++ b/src/test/function_test/test_bulk_load.cpp
@@ -101,9 +101,10 @@ public:
         system(copy_file_cmd.c_str());
     }
 
-    error_code start_bulk_load()
+    error_code start_bulk_load(bool ingest_behind = false)
     {
-        auto err_resp = ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT);
+        auto err_resp =
+            ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
         return err_resp.get_value().err;
     }
 
@@ -123,6 +124,26 @@ public:
         system(cmd.c_str());
     }
 
+    void update_allow_ingest_behind(const std::string &allow_ingest_behind)
+    {
+        // update app envs
+        std::vector<std::string> keys;
+        keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
+        std::vector<std::string> values;
+        values.emplace_back(allow_ingest_behind);
+        auto err_resp = ddl_client->set_app_envs(APP_NAME, keys, values);
+        ASSERT_EQ(err_resp.get_value().err, ERR_OK);
+        std::cout << "sleep 30s to wait app_envs update" << std::endl;
+        std::this_thread::sleep_for(std::chrono::seconds(30));
+        // restart onebox to make config works
+        chdir(pegasus_root_dir.c_str());
+        system("./run.sh stop_onebox");
+        std::this_thread::sleep_for(std::chrono::seconds(5));
+        system("./run.sh start_onebox -w");
+        std::this_thread::sleep_for(std::chrono::seconds(20));
+        chdir(working_root_dir.c_str());
+    }
+
     bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
     {
         int64_t sleep_time = 5;
@@ -269,3 +290,38 @@ TEST_F(bulk_load_test, bulk_load_tests)
     operate_data(operation::DEL, "", 15);
     operate_data(operation::NO_VALUE, "", 15);
 }
+
+///
+/// case1: inconsistent ingest_behind
+/// case2: bulk load(ingest_behind) succeed with data verfied
+/// case3: bulk load data consistent:
+///     - bulk load data will be overrided by old data
+///     - get/set/del succeed after bulk load
+///
+TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
+{
+    // app envs allow_ingest_behind = false, request ingest_behind = true
+    ASSERT_EQ(start_bulk_load(true), ERR_INCONSISTENT_STATE);
+
+    update_allow_ingest_behind("true");
+
+    // write old data
+    operate_data(operation::SET, "oldValue", 10);
+    operate_data(operation::GET, "oldValue", 10);
+
+    ASSERT_EQ(start_bulk_load(true), ERR_OK);
+    ASSERT_EQ(wait_bulk_load_finish(300), bulk_load_status::BLS_SUCCEED);
+
+    std::cout << "Start to verify data..." << std::endl;
+    // value overide by bulk_loaded_data
+    operate_data(operation::GET, "oldValue", 10);
+    ASSERT_TRUE(verify_data("hashkey", "sortkey"));
+
+    // write data after bulk load succeed
+    operate_data(operation::SET, "valueAfterBulkLoad", 20);
+    operate_data(operation::GET, "valueAfterBulkLoad", 20);
+
+    // del data after bulk load succeed
+    operate_data(operation::DEL, "", 15);
+    operate_data(operation::NO_VALUE, "", 15);
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org