You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by "empiredan (via GitHub)" <gi...@apache.org> on 2023/09/20 14:15:43 UTC

[GitHub] [incubator-pegasus] empiredan commented on a diff in pull request #1616: refactor(test): refactor bulk load function test

empiredan commented on code in PR #1616:
URL: https://github.com/apache/incubator-pegasus/pull/1616#discussion_r1331695050


##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -55,93 +65,115 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
 protected:
     bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     "onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        blob value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value.data(), value.length()),
+                                            bulk_load_info_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    error_code start_bulk_load(bool ingest_behind = false)
+    // Generate the '.bulk_load_info.meta' file according to the 'bulk_load_info' file
+    // in path 'bulk_load_info_path'.
+    void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
     {
-        auto err_resp =
-            ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
-        return err_resp.get_value().err;
+        dist::block_service::file_metadata fm;
+        ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, fm.size));
+        ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5));
+        std::string value = nlohmann::json(fm).dump();
+        string bulk_load_info_meta_path =

Review Comment:
   ```suggestion
           auto bulk_load_info_meta_path =
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -55,93 +65,115 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
 protected:
     bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     "onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        blob value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);

Review Comment:
   ```suggestion
           auto value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -55,93 +65,115 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
 protected:
     bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     "onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        blob value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value.data(), value.length()),
+                                            bulk_load_info_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    error_code start_bulk_load(bool ingest_behind = false)
+    // Generate the '.bulk_load_info.meta' file according to the 'bulk_load_info' file
+    // in path 'bulk_load_info_path'.
+    void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
     {
-        auto err_resp =
-            ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
-        return err_resp.get_value().err;
+        dist::block_service::file_metadata fm;
+        ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, fm.size));
+        ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5));
+        std::string value = nlohmann::json(fm).dump();
+        string bulk_load_info_meta_path =
+            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, kCluster, app_name_);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value),
+                                            bulk_load_info_meta_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    void remove_file(const string &file_path)
+    void copy_bulk_load_files()
+    {
+        // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we can generate it.
+        // Prepare bulk load files.
+        // The source data has 8 partitions.
+        ASSERT_EQ(8, partition_count_);
+        NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
+        NO_FATALS(run_cmd_from_project_root(
+            fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad, kLocalServiceRoot)));
+
+        // Generate 'bulk_load_info'.
+        string bulk_load_info_path =

Review Comment:
   ```suggestion
           auto bulk_load_info_path =
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -156,52 +188,51 @@ class bulk_load_test : public test_util
 
     void verify_bulk_load_data()
     {
-        ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
-        ASSERT_NO_FATAL_FAILURE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, kBulkLoadSortKeyPrefix1));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix2, kBulkLoadSortKeyPrefix2));
     }
 
     void verify_data(const string &hashkey_prefix, const string &sortkey_prefix)
     {
-        const string &expected_value = VALUE;
-        for (int i = 0; i < COUNT; ++i) {
+        for (int i = 0; i < kBulkLoadItemCount; ++i) {
             string hash_key = hashkey_prefix + std::to_string(i);
-            for (int j = 0; j < COUNT; ++j) {
+            for (int j = 0; j < kBulkLoadItemCount; ++j) {
                 string sort_key = sortkey_prefix + std::to_string(j);
-                string act_value;
-                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, act_value)) << hash_key << ","
-                                                                                << sort_key;
-                ASSERT_EQ(expected_value, act_value) << hash_key << "," << sort_key;
+                string actual_value;
+                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, actual_value))
+                    << hash_key << "," << sort_key;
+                ASSERT_EQ(kBulkLoadValue, actual_value) << hash_key << "," << sort_key;
             }
         }
     }
 
-    enum operation
+    enum class operation
     {
         GET,
         SET,
         DEL,
         NO_VALUE
     };
-    void operate_data(bulk_load_test::operation op, const string &value, int count)
+    void operate_data(operation op, const string &value, int count)
     {
         for (int i = 0; i < count; ++i) {
-            string hash_key = HASHKEY_PREFIX + std::to_string(i);
-            string sort_key = SORTKEY_PREFIX + std::to_string(i);
+            string hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);
+            string sort_key = fmt::format("{}{}", kBulkLoadSortKeyPrefix2, i);

Review Comment:
   ```suggestion
               auto sort_key = fmt::format("{}{}", kBulkLoadSortKeyPrefix2, i);
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -55,93 +65,115 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
 protected:
     bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     "onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        blob value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value.data(), value.length()),
+                                            bulk_load_info_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    error_code start_bulk_load(bool ingest_behind = false)
+    // Generate the '.bulk_load_info.meta' file according to the 'bulk_load_info' file
+    // in path 'bulk_load_info_path'.
+    void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
     {
-        auto err_resp =
-            ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
-        return err_resp.get_value().err;
+        dist::block_service::file_metadata fm;
+        ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, fm.size));
+        ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5));
+        std::string value = nlohmann::json(fm).dump();
+        string bulk_load_info_meta_path =
+            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, kCluster, app_name_);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value),
+                                            bulk_load_info_meta_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    void remove_file(const string &file_path)
+    void copy_bulk_load_files()
+    {
+        // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we can generate it.
+        // Prepare bulk load files.
+        // The source data has 8 partitions.
+        ASSERT_EQ(8, partition_count_);
+        NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
+        NO_FATALS(run_cmd_from_project_root(
+            fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad, kLocalServiceRoot)));
+
+        // Generate 'bulk_load_info'.
+        string bulk_load_info_path =
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, kCluster, app_name_);
+        NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, partition_count_),
+                                          bulk_load_info_path));
+
+        // Generate '.bulk_load_info.meta'.
+        NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+    }
+
+    error_code start_bulk_load(bool ingest_behind = false)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm " + file_path));
+        return ddl_client_
+            ->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad, ingest_behind)
+            .get_value()
+            .err;
     }
 
-    void replace_bulk_load_info()
+    void remove_file(const string &file_path)
     {
-        string cmd = "cp -R "
-                     "src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/"
-                     "mock_bulk_load_info/. " +
-                     bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + "/";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        NO_FATALS(run_cmd_from_project_root("rm " + file_path));
     }
 
     void update_allow_ingest_behind(const string &allow_ingest_behind)
     {
-        // update app envs
-        std::vector<string> keys;
-        keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
-        std::vector<string> values;
-        values.emplace_back(allow_ingest_behind);
-        ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys, values).get_value().err);
+        const auto ret = ddl_client_->set_app_envs(
+            app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind});
+        ASSERT_EQ(ERR_OK, ret.get_value().err);
         std::cout << "sleep 31s to wait app_envs update" << std::endl;
         std::this_thread::sleep_for(std::chrono::seconds(31));
     }
 
-    bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
+    bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds)
     {
         int64_t sleep_time = 5;
         error_code err = ERR_OK;

Review Comment:
   ```suggestion
           auto err = ERR_OK;
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -55,93 +65,115 @@ using std::string;
 ///  - `bulk_load_root` sub-directory stores right data
 ///     - Please do not rename any files or directories under this folder
 ///
-/// The app who is executing bulk load:
-/// - app_name is `temp`, app_id is 2, partition_count is 8
+/// The app to test bulk load functionality:
+/// - partition count should be 8
 ///
 /// Data:
-/// hashkey: hashi sortkey: sorti value: newValue       i=[0, 1000]
-/// hashkey: hashkeyj sortkey: sortkeyj value: newValue j=[0, 1000]
+/// hashkey: hash${i} sortkey: sort${i} value: newValue       i=[0, 1000]
+/// hashkey: hashkey${j} sortkey: sortkey${j} value: newValue j=[0, 1000]
 ///
 class bulk_load_test : public test_util
 {
 protected:
     bulk_load_test() : test_util(map<string, string>({{"rocksdb.allow_ingest_behind", "true"}}))
     {
         TRICKY_CODE_TO_AVOID_LINK_ERROR;
-        bulk_load_local_root_ =
-            utils::filesystem::path_combine("onebox/block_service/local_service/", LOCAL_ROOT);
+        bulk_load_local_app_root_ =
+            fmt::format("{}/{}/{}", kLocalBulkLoadRoot, kCluster, app_name_);
     }
 
     void SetUp() override
     {
         test_util::SetUp();
-        ASSERT_NO_FATAL_FAILURE(copy_bulk_load_files());
+        NO_FATALS(copy_bulk_load_files());
     }
 
     void TearDown() override
     {
         ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm -rf onebox/block_service"));
+        NO_FATALS(run_cmd_from_project_root("rm -rf " + kLocalBulkLoadRoot));
     }
 
-    void copy_bulk_load_files()
+    // Generate the 'bulk_load_info' file according to 'bli' to path 'bulk_load_info_path'.
+    void generate_bulk_load_info(const bulk_load_info &bli, const std::string &bulk_load_info_path)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("mkdir -p onebox/block_service"));
-        ASSERT_NO_FATAL_FAILURE(
-            run_cmd_from_project_root("mkdir -p onebox/block_service/local_service"));
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
-            "cp -r src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/" +
-            LOCAL_ROOT + " onebox/block_service/local_service"));
-        string cmd = "echo '{\"app_id\":" + std::to_string(app_id_) +
-                     ",\"app_name\":\"temp\",\"partition_count\":8}' > "
-                     "onebox/block_service/local_service/bulk_load_root/cluster/temp/"
-                     "bulk_load_info";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        blob value = dsn::json::json_forwarder<bulk_load_info>::encode(bli);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value.data(), value.length()),
+                                            bulk_load_info_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    error_code start_bulk_load(bool ingest_behind = false)
+    // Generate the '.bulk_load_info.meta' file according to the 'bulk_load_info' file
+    // in path 'bulk_load_info_path'.
+    void generate_bulk_load_info_meta(const std::string &bulk_load_info_path)
     {
-        auto err_resp =
-            ddl_client_->start_bulk_load(app_name_, CLUSTER, PROVIDER, LOCAL_ROOT, ingest_behind);
-        return err_resp.get_value().err;
+        dist::block_service::file_metadata fm;
+        ASSERT_TRUE(utils::filesystem::file_size(bulk_load_info_path, fm.size));
+        ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(bulk_load_info_path, fm.md5));
+        std::string value = nlohmann::json(fm).dump();
+        string bulk_load_info_meta_path =
+            fmt::format("{}/{}/{}/.bulk_load_info.meta", kLocalBulkLoadRoot, kCluster, app_name_);
+        auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                            rocksdb::Slice(value),
+                                            bulk_load_info_meta_path,
+                                            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
 
-    void remove_file(const string &file_path)
+    void copy_bulk_load_files()
+    {
+        // TODO(yingchun): remove the 'mock_bulk_load_info' file, because we can generate it.
+        // Prepare bulk load files.
+        // The source data has 8 partitions.
+        ASSERT_EQ(8, partition_count_);
+        NO_FATALS(run_cmd_from_project_root("mkdir -p " + kLocalBulkLoadRoot));
+        NO_FATALS(run_cmd_from_project_root(
+            fmt::format("cp -r {}/{} {}", kSourceFilesRoot, kBulkLoad, kLocalServiceRoot)));
+
+        // Generate 'bulk_load_info'.
+        string bulk_load_info_path =
+            fmt::format("{}/{}/{}/bulk_load_info", kLocalBulkLoadRoot, kCluster, app_name_);
+        NO_FATALS(generate_bulk_load_info(bulk_load_info(app_id_, app_name_, partition_count_),
+                                          bulk_load_info_path));
+
+        // Generate '.bulk_load_info.meta'.
+        NO_FATALS(generate_bulk_load_info_meta(bulk_load_info_path));
+    }
+
+    error_code start_bulk_load(bool ingest_behind = false)
     {
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root("rm " + file_path));
+        return ddl_client_
+            ->start_bulk_load(app_name_, kCluster, kProvider, kBulkLoad, ingest_behind)
+            .get_value()
+            .err;
     }
 
-    void replace_bulk_load_info()
+    void remove_file(const string &file_path)
     {
-        string cmd = "cp -R "
-                     "src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files/"
-                     "mock_bulk_load_info/. " +
-                     bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + "/";
-        ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(cmd));
+        NO_FATALS(run_cmd_from_project_root("rm " + file_path));
     }
 
     void update_allow_ingest_behind(const string &allow_ingest_behind)
     {
-        // update app envs
-        std::vector<string> keys;
-        keys.emplace_back(ROCKSDB_ALLOW_INGEST_BEHIND);
-        std::vector<string> values;
-        values.emplace_back(allow_ingest_behind);
-        ASSERT_EQ(ERR_OK, ddl_client_->set_app_envs(app_name_, keys, values).get_value().err);
+        const auto ret = ddl_client_->set_app_envs(
+            app_name_, {ROCKSDB_ALLOW_INGEST_BEHIND}, {allow_ingest_behind});
+        ASSERT_EQ(ERR_OK, ret.get_value().err);
         std::cout << "sleep 31s to wait app_envs update" << std::endl;
         std::this_thread::sleep_for(std::chrono::seconds(31));
     }
 
-    bulk_load_status::type wait_bulk_load_finish(int64_t seconds)
+    bulk_load_status::type wait_bulk_load_finish(int64_t remain_seconds)
     {
         int64_t sleep_time = 5;
         error_code err = ERR_OK;
 
         bulk_load_status::type last_status = bulk_load_status::BLS_INVALID;

Review Comment:
   ```suggestion
           auto last_status = bulk_load_status::BLS_INVALID;
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -156,52 +188,51 @@ class bulk_load_test : public test_util
 
     void verify_bulk_load_data()
     {
-        ASSERT_NO_FATAL_FAILURE(verify_data("hashkey", "sortkey"));
-        ASSERT_NO_FATAL_FAILURE(verify_data(HASHKEY_PREFIX, SORTKEY_PREFIX));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, kBulkLoadSortKeyPrefix1));
+        NO_FATALS(verify_data(kBulkLoadHashKeyPrefix2, kBulkLoadSortKeyPrefix2));
     }
 
     void verify_data(const string &hashkey_prefix, const string &sortkey_prefix)
     {
-        const string &expected_value = VALUE;
-        for (int i = 0; i < COUNT; ++i) {
+        for (int i = 0; i < kBulkLoadItemCount; ++i) {
             string hash_key = hashkey_prefix + std::to_string(i);
-            for (int j = 0; j < COUNT; ++j) {
+            for (int j = 0; j < kBulkLoadItemCount; ++j) {
                 string sort_key = sortkey_prefix + std::to_string(j);
-                string act_value;
-                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, act_value)) << hash_key << ","
-                                                                                << sort_key;
-                ASSERT_EQ(expected_value, act_value) << hash_key << "," << sort_key;
+                string actual_value;
+                ASSERT_EQ(PERR_OK, client_->get(hash_key, sort_key, actual_value))
+                    << hash_key << "," << sort_key;
+                ASSERT_EQ(kBulkLoadValue, actual_value) << hash_key << "," << sort_key;
             }
         }
     }
 
-    enum operation
+    enum class operation
     {
         GET,
         SET,
         DEL,
         NO_VALUE
     };
-    void operate_data(bulk_load_test::operation op, const string &value, int count)
+    void operate_data(operation op, const string &value, int count)
     {
         for (int i = 0; i < count; ++i) {
-            string hash_key = HASHKEY_PREFIX + std::to_string(i);
-            string sort_key = SORTKEY_PREFIX + std::to_string(i);
+            string hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);

Review Comment:
   ```suggestion
               auto hash_key = fmt::format("{}{}", kBulkLoadHashKeyPrefix2, i);
   ```



##########
src/test/function_test/bulk_load/test_bulk_load.cpp:
##########
@@ -210,108 +241,108 @@ class bulk_load_test : public test_util
         }
     }
 
-protected:
-    string bulk_load_local_root_;
+    void check_bulk_load(bool ingest_behind,
+                         const std::string &value_before_bulk_load,
+                         const std::string &value_after_bulk_load)
+    {
+        // Write some data before bulk load.
+        NO_FATALS(operate_data(operation::SET, value_before_bulk_load, 10));
+        NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));
+
+        // Start bulk load and wait until it complete.
+        ASSERT_EQ(ERR_OK, start_bulk_load(ingest_behind));
+        ASSERT_EQ(bulk_load_status::BLS_SUCCEED, wait_bulk_load_finish(300));
+
+        std::cout << "Start to verify data..." << std::endl;
+        if (ingest_behind) {
+            // Values have NOT been overwritten by the bulk load data.
+            NO_FATALS(operate_data(operation::GET, value_before_bulk_load, 10));
+            NO_FATALS(verify_data(kBulkLoadHashKeyPrefix1, kBulkLoadSortKeyPrefix1));
+        } else {
+            // Values have been overwritten by the bulk load data.
+            NO_FATALS(operate_data(operation::GET, kBulkLoadValue, 10));
+            NO_FATALS(verify_bulk_load_data());
+        }
 
-    const string LOCAL_ROOT = "bulk_load_root";
-    const string CLUSTER = "cluster";
-    const string PROVIDER = "local_service";
+        // Write new data succeed after bulk load.
+        NO_FATALS(operate_data(operation::SET, value_after_bulk_load, 20));
+        NO_FATALS(operate_data(operation::GET, value_after_bulk_load, 20));
 
-    const string HASHKEY_PREFIX = "hash";
-    const string SORTKEY_PREFIX = "sort";
-    const string VALUE = "newValue";
-    const int32_t COUNT = 1000;
+        // Delete data succeed after bulk load.
+        NO_FATALS(operate_data(operation::DEL, "", 15));
+        NO_FATALS(operate_data(operation::NO_VALUE, "", 15));
+    }
+
+protected:
+    string bulk_load_local_app_root_;
+    const string kSourceFilesRoot =
+        "src/test/function_test/bulk_load/pegasus-bulk-load-function-test-files";
+    const string kLocalServiceRoot = "onebox/block_service/local_service";
+    const string kLocalBulkLoadRoot = "onebox/block_service/local_service/bulk_load_root";
+    const string kBulkLoad = "bulk_load_root";
+    const string kCluster = "cluster";
+    const string kProvider = "local_service";
+
+    const int32_t kBulkLoadItemCount = 1000;
+    const string kBulkLoadHashKeyPrefix1 = "hashkey";
+    const string kBulkLoadSortKeyPrefix1 = "sortkey";
+    const string kBulkLoadValue = "newValue";
+
+    // Real time write operations will use this prefix as well.
+    const string kBulkLoadHashKeyPrefix2 = "hash";
+    const string kBulkLoadSortKeyPrefix2 = "sort";
 };
 
-///
-/// case1: lack of `bulk_load_info` file
-/// case2: `bulk_load_info` file inconsistent with app_info
-///
-TEST_F(bulk_load_test, bulk_load_test_failed)
+// Test bulk load failed because the 'bulk_load_info' file is missing
+TEST_F(bulk_load_test, missing_bulk_load_info)
 {
-    // bulk load failed because `bulk_load_info` file is missing
-    ASSERT_NO_FATAL_FAILURE(
-        remove_file(bulk_load_local_root_ + "/" + CLUSTER + "/" + app_name_ + "/bulk_load_info"));
+    NO_FATALS(remove_file(bulk_load_local_app_root_ + "/bulk_load_info"));
     ASSERT_EQ(ERR_OBJECT_NOT_FOUND, start_bulk_load());
+}
 
-    // bulk load failed because `bulk_load_info` file inconsistent with current app_info
-    ASSERT_NO_FATAL_FAILURE(replace_bulk_load_info());
-    ASSERT_EQ(ERR_INCONSISTENT_STATE, start_bulk_load());
+// Test bulk load failed because the 'bulk_load_info' file is inconsistent with the actual app info.
+TEST_F(bulk_load_test, inconsistent_bulk_load_info)
+{
+    // Only 'app_id' and 'partition_count' will be checked in Pegasus server, so just inject these
+    // kind of inconsistencies.
+    bulk_load_info tests[] = {{app_id_ + 1, app_name_, partition_count_},
+                              {app_id_, app_name_, partition_count_ * 2}};
+    for (const auto &test : tests) {
+        // Generate inconsistent 'bulk_load_info'.
+        string bulk_load_info_path =

Review Comment:
   ```suggestion
           auto bulk_load_info_path =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org