You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/01/06 07:25:30 UTC
[incubator-pegasus] branch master updated: feat(online_migration): part3 - support ingestion_behind for pegasus (#862)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new fa0ec7a feat(online_migration): part3 - support ingestion_behind for pegasus (#862)
fa0ec7a is described below
commit fa0ec7aa6e16f79313e7679c202eae5f5a3280fc
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Thu Jan 6 15:25:22 2022 +0800
feat(online_migration): part3 - support ingestion_behind for pegasus (#862)
---
rdsn | 2 +-
src/base/pegasus_const.cpp | 2 ++
src/base/pegasus_const.h | 2 ++
src/server/pegasus_server_impl.cpp | 18 ++++++++++
src/server/pegasus_server_impl.h | 2 ++
src/server/pegasus_server_write.cpp | 2 +-
src/server/pegasus_write_service.cpp | 8 ++---
src/server/pegasus_write_service.h | 6 ++--
src/server/pegasus_write_service_impl.h | 10 +++---
src/server/rocksdb_wrapper.cpp | 5 ++-
src/server/rocksdb_wrapper.h | 4 ++-
src/shell/commands/bulk_load.cpp | 9 +++--
src/shell/main.cpp | 2 +-
src/test/function_test/test_bulk_load.cpp | 60 +++++++++++++++++++++++++++++--
14 files changed, 112 insertions(+), 20 deletions(-)
diff --git a/rdsn b/rdsn
index 982c2bd..93fe4c4 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 982c2bd180a081fe2c772191cad0ea279e0e2685
+Subproject commit 93fe4c4f4cfd8d85bf0925dccc398e3eec9a8999
diff --git a/src/base/pegasus_const.cpp b/src/base/pegasus_const.cpp
index f8127ba..c93c0ba 100644
--- a/src/base/pegasus_const.cpp
+++ b/src/base/pegasus_const.cpp
@@ -99,4 +99,6 @@ const std::string SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partitio
const std::string USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string READ_SIZE_THROTTLING("replica.read_throttling_by_size");
+
+const std::string ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_ingest_behind");
} // namespace pegasus
diff --git a/src/base/pegasus_const.h b/src/base/pegasus_const.h
index bbe4193..c2a47ce 100644
--- a/src/base/pegasus_const.h
+++ b/src/base/pegasus_const.h
@@ -70,4 +70,6 @@ extern const std::string SPLIT_VALIDATE_PARTITION_HASH;
extern const std::string USER_SPECIFIED_COMPACTION;
extern const std::string READ_SIZE_THROTTLING;
+
+extern const std::string ROCKSDB_ALLOW_INGEST_BEHIND;
} // namespace pegasus
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index a0fa94c..72a2448 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -1487,11 +1487,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
+ _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
// When create new DB, we have to create a new column family to store meta data (meta column
// family).
_db_opts.create_missing_column_families = true;
+ _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
@@ -2695,6 +2697,22 @@ void pegasus_server_impl::update_user_specified_compaction(
}
}
+bool pegasus_server_impl::parse_allow_ingest_behind(const std::map<std::string, std::string> &envs)
+{
+ bool allow_ingest_behind = false;
+ const auto &iter = envs.find(ROCKSDB_ALLOW_INGEST_BEHIND);
+ if (iter == envs.end()) {
+ return allow_ingest_behind;
+ }
+ if (!dsn::buf2bool(iter->second, allow_ingest_behind)) {
+ dwarn_replica(
+ "{}={} is invalid, set allow_ingest_behind = false", iter->first, iter->second);
+ return false;
+ }
+ ddebug_replica("update allow_ingest_behind = {}", allow_ingest_behind);
+ return allow_ingest_behind;
+}
+
bool pegasus_server_impl::parse_compression_types(
const std::string &config, std::vector<rocksdb::CompressionType> &compression_per_level)
{
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 37ff805..c85e807 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -284,6 +284,8 @@ private:
void update_throttling_controller(const std::map<std::string, std::string> &envs);
+ bool parse_allow_ingest_behind(const std::map<std::string, std::string> &envs);
+
// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool parse_compression_types(const std::string &config,
diff --git a/src/server/pegasus_server_write.cpp b/src/server/pegasus_server_write.cpp
index 804294c..8501bd1 100644
--- a/src/server/pegasus_server_write.cpp
+++ b/src/server/pegasus_server_write.cpp
@@ -195,7 +195,7 @@ void pegasus_server_write::init_non_batch_write_handlers()
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
- return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
+ return _write_svc->ingest_files(_decree, rpc.request(), rpc.response());
}},
};
}
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index d31dc0e..7658cad 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -373,9 +373,9 @@ int pegasus_write_service::duplicate(int64_t decree,
return empty_put(ctx.decree);
}
-int pegasus_write_service::ingestion_files(int64_t decree,
- const dsn::replication::ingestion_request &req,
- dsn::replication::ingestion_response &resp)
+int pegasus_write_service::ingest_files(int64_t decree,
+ const dsn::replication::ingestion_request &req,
+ dsn::replication::ingestion_response &resp)
{
// TODO(heyuchen): consider cu
@@ -391,7 +391,7 @@ int pegasus_write_service::ingestion_files(int64_t decree,
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
dsn::error_code err =
- _impl->ingestion_files(decree, _server->bulk_load_dir(), req.metadata);
+ _impl->ingest_files(decree, _server->bulk_load_dir(), req.metadata, req.ingest_behind);
if (err == dsn::ERR_OK) {
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED);
} else {
diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h
index 6bc4de7..badbd12 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -142,9 +142,9 @@ public:
dsn::apps::duplicate_response &resp);
// Execute bulk load ingestion
- int ingestion_files(int64_t decree,
- const dsn::replication::ingestion_request &req,
- dsn::replication::ingestion_response &resp);
+ int ingest_files(int64_t decree,
+ const dsn::replication::ingestion_request &req,
+ dsn::replication::ingestion_response &resp);
/// For batch write.
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index d0278dc..6f96763 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -483,9 +483,10 @@ public:
// \return ERR_WRONG_CHECKSUM: verify files failed
// \return ERR_INGESTION_FAILED: rocksdb ingestion failed
// \return ERR_OK: rocksdb ingestion succeed
- dsn::error_code ingestion_files(const int64_t decree,
- const std::string &bulk_load_dir,
- const dsn::replication::bulk_load_metadata &metadata)
+ dsn::error_code ingest_files(const int64_t decree,
+ const std::string &bulk_load_dir,
+ const dsn::replication::bulk_load_metadata &metadata,
+ const bool ingest_behind)
{
// verify external files before ingestion
std::vector<std::string> sst_file_list;
@@ -495,7 +496,8 @@ public:
}
// ingest external files
- if (dsn_unlikely(_rocksdb_wrapper->ingestion_files(decree, sst_file_list) != 0)) {
+ if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, ingest_behind) !=
+ 0)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index a6db999..ed52631 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -180,10 +180,13 @@ int rocksdb_wrapper::write_batch_delete(int64_t decree, dsn::string_view raw_key
void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
-int rocksdb_wrapper::ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list)
+int rocksdb_wrapper::ingest_files(int64_t decree,
+ const std::vector<std::string> &sst_file_list,
+ const bool ingest_behind)
{
rocksdb::IngestExternalFileOptions ifo;
ifo.move_files = true;
+ ifo.ingest_behind = ingest_behind;
rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo);
if (dsn_unlikely(!s.ok())) {
derror_rocksdb("IngestExternalFile", s.ToString(), "decree = {}", decree);
diff --git a/src/server/rocksdb_wrapper.h b/src/server/rocksdb_wrapper.h
index b29bb7d..4f8abdf 100644
--- a/src/server/rocksdb_wrapper.h
+++ b/src/server/rocksdb_wrapper.h
@@ -64,7 +64,9 @@ public:
int write(int64_t decree);
int write_batch_delete(int64_t decree, dsn::string_view raw_key);
void clear_up_write_batch();
- int ingestion_files(int64_t decree, const std::vector<std::string> &sst_file_list);
+ int ingest_files(int64_t decree,
+ const std::vector<std::string> &sst_file_list,
+ const bool ingest_behind);
void set_default_ttl(uint32_t ttl);
diff --git a/src/shell/commands/bulk_load.cpp b/src/shell/commands/bulk_load.cpp
index c55a150..6da7c44 100644
--- a/src/shell/commands/bulk_load.cpp
+++ b/src/shell/commands/bulk_load.cpp
@@ -25,17 +25,19 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
{"cluster_name", required_argument, 0, 'c'},
{"file_provider_type", required_argument, 0, 'p'},
{"root_path", required_argument, 0, 'r'},
+ {"ingest_behind", no_argument, 0, 'i'},
{0, 0, 0, 0}};
std::string app_name;
std::string cluster_name;
std::string file_provider_type;
std::string remote_root_path;
+ bool ingest_behind = false;
optind = 0;
while (true) {
int option_index = 0;
int c;
- c = getopt_long(args.argc, args.argv, "a:c:p:r:", long_options, &option_index);
+ c = getopt_long(args.argc, args.argv, "a:c:p:r:i", long_options, &option_index);
if (c == -1)
break;
switch (c) {
@@ -51,6 +53,9 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
case 'r':
remote_root_path = optarg;
break;
+ case 'i':
+ ingest_behind = true;
+ break;
default:
return false;
}
@@ -76,7 +81,7 @@ bool start_bulk_load(command_executor *e, shell_context *sc, arguments args)
}
auto err_resp = sc->ddl_client->start_bulk_load(
- app_name, cluster_name, file_provider_type, remote_root_path);
+ app_name, cluster_name, file_provider_type, remote_root_path, ingest_behind);
dsn::error_s err = err_resp.get_error();
std::string hint_msg;
if (err.is_ok()) {
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 1b05ef1..8be8555 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -462,7 +462,7 @@ static command_executor commands[] = {
"start_bulk_load",
"start app bulk load",
"<-a --app_name str> <-c --cluster_name str> <-p --file_provider_type str> <-r "
- "--root_path>",
+ "--root_path> [-i --ingest_behind]",
start_bulk_load,
},
{
diff --git a/src/test/function_test/test_bulk_load.cpp b/src/test/function_test/test_bulk_load.cpp
index 7f5c11b..37bec7b 100644
--- a/src/test/function_test/test_bulk_load.cpp
+++ b/src/test/function_test/test_bulk_load.cpp
@@ -101,9 +101,10 @@ public:
system(copy_file_cmd.c_str());
}
- error_code start_bulk_load()
+ error_code start_bulk_load(bool ingest_behind = false)
{
- auto err_resp = ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT);
+ auto err_resp =
+ ddl_client->start_bulk_load(APP_NAME, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
return err_resp.get_value().err;
}
@@ -123,6 +124,26 @@ public:
system(cmd.c_str());
}
+ void update_allow_ingest_behind(const std::string &allow_ingest_behind)
+ {
+ // update app envs
+ std::vector<std::string> keys;
+ keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
+ std::vector<std::string> values;
+ values.emplace_back(allow_ingest_behind);
+ auto err_resp = ddl_client->set_app_envs(APP_NAME, keys, values);
+ ASSERT_EQ(err_resp.get_value().err, ERR_OK);
+ std::cout << "sleep 30s to wait app_envs update" << std::endl;
+ std::this_thread::sleep_for(std::chrono::seconds(30));
+ // restart onebox to make config works
+ chdir(pegasus_root_dir.c_str());
+ system("./run.sh stop_onebox");
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ system("./run.sh start_onebox -w");
+ std::this_thread::sleep_for(std::chrono::seconds(20));
+ chdir(working_root_dir.c_str());
+ }
+
bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
{
int64_t sleep_time = 5;
@@ -269,3 +290,38 @@ TEST_F(bulk_load_test, bulk_load_tests)
operate_data(operation::DEL, "", 15);
operate_data(operation::NO_VALUE, "", 15);
}
+
+///
+/// case1: inconsistent ingest_behind
+/// case2: bulk load(ingest_behind) succeed with data verfied
+/// case3: bulk load data consistent:
+/// - bulk load data will be overrided by old data
+/// - get/set/del succeed after bulk load
+///
+TEST_F(bulk_load_test, bulk_load_ingest_behind_tests)
+{
+ // app envs allow_ingest_behind = false, request ingest_behind = true
+ ASSERT_EQ(start_bulk_load(true), ERR_INCONSISTENT_STATE);
+
+ update_allow_ingest_behind("true");
+
+ // write old data
+ operate_data(operation::SET, "oldValue", 10);
+ operate_data(operation::GET, "oldValue", 10);
+
+ ASSERT_EQ(start_bulk_load(true), ERR_OK);
+ ASSERT_EQ(wait_bulk_load_finish(300), bulk_load_status::BLS_SUCCEED);
+
+ std::cout << "Start to verify data..." << std::endl;
+ // value overide by bulk_loaded_data
+ operate_data(operation::GET, "oldValue", 10);
+ ASSERT_TRUE(verify_data("hashkey", "sortkey"));
+
+ // write data after bulk load succeed
+ operate_data(operation::SET, "valueAfterBulkLoad", 20);
+ operate_data(operation::GET, "valueAfterBulkLoad", 20);
+
+ // del data after bulk load succeed
+ operate_data(operation::DEL, "", 15);
+ operate_data(operation::NO_VALUE, "", 15);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org