You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/16 13:35:34 UTC

[incubator-doris] branch master updated: [Feature] Add a http interface for single tablet migration between different disks (#5101)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 99b22c9  [Feature] Add a http interface for single tablet migration between different disks (#5101)
99b22c9 is described below

commit 99b22c92f826d39f61ac568528b360d7c76f4248
Author: weizuo93 <68...@users.noreply.github.com>
AuthorDate: Sat Jan 16 21:35:20 2021 +0800

    [Feature] Add a http interface for single tablet migration between different disks (#5101)
    
    Based on PR #4475, this patch add a new feature for single tablet migration between different disks by http.
    Co-authored-by: weizuo <we...@xiaomi.com>
---
 be/src/common/config.h                             |   6 +
 be/src/http/CMakeLists.txt                         |   1 +
 be/src/http/action/tablet_migration_action.cpp     | 239 +++++++++++++++++++++
 be/src/http/action/tablet_migration_action.h       |  79 +++++++
 be/src/service/http_service.cpp                    |   5 +
 .../http-actions/tablet-migration-action.md        |  89 ++++++++
 .../http-actions/tablet-migration-action.md        |  87 ++++++++
 7 files changed, 506 insertions(+)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 66e52a3..9a28c6b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -305,6 +305,12 @@ CONF_mInt64(tablet_scan_frequency_time_node_interval_second, "300");
 CONF_mInt32(compaction_tablet_scan_frequency_factor, "0");
 CONF_mInt32(compaction_tablet_compaction_score_factor, "1");
 
+// This config can be set to limit thread number in tablet migration thread pool.
+CONF_Int32(min_tablet_migration_threads, "1");
+CONF_Int32(max_tablet_migration_threads, "1");
+
+CONF_mInt32(finished_migration_tasks_size, "10000");
+
 // Port to start debug webserver on
 CONF_Int32(webserver_port, "8040");
 // Number of webserver workers
diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt
index 176955b..4d7caac 100644
--- a/be/src/http/CMakeLists.txt
+++ b/be/src/http/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(Webserver STATIC
   http_client.cpp
   action/mini_load.cpp
   action/health_action.cpp
+  action/tablet_migration_action.cpp
   action/tablets_info_action.cpp
   action/tablets_distribution_action.cpp
   action/checksum_action.cpp
diff --git a/be/src/http/action/tablet_migration_action.cpp b/be/src/http/action/tablet_migration_action.cpp
new file mode 100644
index 0000000..ed7ea21
--- /dev/null
+++ b/be/src/http/action/tablet_migration_action.cpp
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/tablet_migration_action.h"
+
+#include <string>
+
+#include "gutil/strings/substitute.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "olap/task/engine_storage_migration_task.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+TabletMigrationAction::TabletMigrationAction() {
+    _init_migration_action();
+}
+
+void TabletMigrationAction::_init_migration_action() {
+    int32_t max_thread_num = config::max_tablet_migration_threads;
+    int32_t min_thread_num = config::min_tablet_migration_threads;
+    ThreadPoolBuilder("MigrationTaskThreadPool")
+            .set_min_threads(min_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_migration_thread_pool);
+}
+
+void TabletMigrationAction::handle(HttpRequest* req) {
+    int64_t tablet_id = 0;
+    int32_t schema_hash = 0;
+    string dest_disk = "";
+    string goal = "";
+    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
+    if (status.ok()) {
+        if (goal == "run") {
+            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
+            TabletSharedPtr tablet;
+            DataDir* dest_store;
+            Status status =
+                    _check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store);
+            if (status.ok()) {
+                do {
+                    {
+                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task = _migration_tasks.find(current_task);
+                        if (it_task != _migration_tasks.end()) {
+                            status = Status::AlreadyExist(strings::Substitute(
+                                    "There is a migration task for this tablet already exists. "
+                                    "dest_disk is $0 .",
+                                    (it_task->first)._dest_disk));
+                            break;
+                        }
+                        _migration_tasks[current_task] = "submitted";
+                    }
+                    auto st = _migration_thread_pool->submit_func([&, tablet_id, schema_hash,
+                                                                   dest_disk, current_task]() {
+                        {
+                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                            _migration_tasks[current_task] = "running";
+                        }
+                        Status result_status = _execute_tablet_migration(tablet, dest_store);
+                        {
+                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                            std::map<MigrationTask, std::string>::iterator it_task =
+                                    _migration_tasks.find(current_task);
+                            if (it_task != _migration_tasks.end()) {
+                                _migration_tasks.erase(it_task);
+                            }
+                            std::pair<MigrationTask, Status> finished_task =
+                                    make_pair(current_task, result_status);
+                            if (_finished_migration_tasks.size() >=
+                                config::finished_migration_tasks_size) {
+                                _finished_migration_tasks.pop_front();
+                            }
+                            _finished_migration_tasks.push_back(finished_task);
+                        }
+                    });
+                    if (!st.ok()) {
+                        status = Status::InternalError("Migration task submission failed");
+                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                        std::map<MigrationTask, std::string>::iterator it_task =
+                                _migration_tasks.find(current_task);
+                        if (it_task != _migration_tasks.end()) {
+                            _migration_tasks.erase(it_task);
+                        }
+                    }
+                } while (0);
+            }
+            std::string status_result;
+            if (!status.ok()) {
+                status_result = to_json(status);
+            } else {
+                status_result =
+                        "{\"status\": \"Success\", \"msg\": \"migration task is successfully "
+                        "submitted.\"}";
+            }
+            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
+            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        } else {
+            DCHECK(goal == "status");
+            MigrationTask current_task(tablet_id, schema_hash);
+            std::string status_result;
+            do {
+                std::unique_lock<std::mutex> lock(_migration_status_mutex);
+                std::map<MigrationTask, std::string>::iterator it_task = _migration_tasks.find(current_task);
+                if (it_task != _migration_tasks.end()) {
+                    status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " +
+                                    it_task->second + "\", \"dest_disk\": \"" +
+                                    (it_task->first)._dest_disk + "\"}";
+                    break;
+                }
+
+                int i = _finished_migration_tasks.size() - 1;
+                for (; i >= 0; i--) {
+                    MigrationTask finished_task = _finished_migration_tasks[i].first;
+                    if (finished_task._tablet_id == tablet_id &&
+                        finished_task._schema_hash == schema_hash) {
+                        status = _finished_migration_tasks[i].second;
+                        if (status.ok()) {
+                            status_result =
+                                    "{\"status\": \"Success\", \"msg\": \"migration task has "
+                                    "finished successfully\", \"dest_disk\": \"" +
+                                    finished_task._dest_disk + "\"}";
+                        }
+                        break;
+                    }
+                }
+                if (i < 0) {
+                    status = Status::NotFound("Migration task not found");
+                }
+            } while (0);
+            if (!status.ok()) {
+                status_result = to_json(status);
+            }
+            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
+            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        }
+    } else {
+        std::string status_result = to_json(status);
+        req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+    }
+}
+
+Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id,
+                                           int32_t& schema_hash, string& dest_disk, string& goal) {
+    const std::string& req_tablet_id = req->param("tablet_id");
+    const std::string& req_schema_hash = req->param("schema_hash");
+    try {
+        tablet_id = std::stoull(req_tablet_id);
+        schema_hash = std::stoul(req_schema_hash);
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "invalid argument.tablet_id:" << req_tablet_id
+                     << ", schema_hash:" << req_schema_hash;
+        return Status::InternalError(strings::Substitute("Convert failed, $0", e.what()));
+    }
+    dest_disk = req->param("disk");
+    goal = req->param("goal");
+    if (goal != "run" && goal != "status") {
+        return Status::InternalError("invalid goal argument.");
+    }
+    return Status::OK();
+}
+
+Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, int32_t schema_hash,
+                                                     string dest_disk, TabletSharedPtr& tablet,
+                                                     DataDir** dest_store) {
+    tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
+    if (tablet == nullptr) {
+        LOG(WARNING) << "no tablet for tablet_id:" << tablet_id << " schema hash:" << schema_hash;
+        return Status::NotFound("Tablet not found");
+    }
+
+    // request specify the data dir
+    *dest_store = StorageEngine::instance()->get_store(dest_disk);
+    if (*dest_store == nullptr) {
+        LOG(WARNING) << "data dir not found: " << dest_disk;
+        return Status::NotFound("Disk not found");
+    }
+
+    if (tablet->data_dir() == *dest_store) {
+        LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
+        return Status::AlreadyExist("Tablet already exist in destination disk");
+    }
+
+    // check disk capacity
+    int64_t tablet_size = tablet->tablet_footprint();
+    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
+        LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
+                     << ", tablet size: " << tablet_size;
+        return Status::InternalError("Insufficient disk capacity");
+    }
+
+    return Status::OK();
+}
+
+Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
+                                                        DataDir* dest_store) {
+    int64_t tablet_id = tablet->tablet_id();
+    int32_t schema_hash = tablet->schema_hash();
+    string dest_disk = dest_store->path();
+    EngineStorageMigrationTask engine_task(tablet, dest_store);
+    OLAPStatus res = StorageEngine::instance()->execute_task(&engine_task);
+    Status status = Status::OK();
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
+                     << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
+                     << ", status:" << res;
+        status = Status::InternalError(strings::Substitute("migration task failed, res: $0", res));
+    } else {
+        LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id
+                  << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
+                  << ", status:" << res;
+    }
+    return status;
+}
+
+} // namespace doris
diff --git a/be/src/http/action/tablet_migration_action.h b/be/src/http/action/tablet_migration_action.h
new file mode 100644
index 0000000..4e5a92b
--- /dev/null
+++ b/be/src/http/action/tablet_migration_action.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+#include "gen_cpp/Status_types.h"
+#include "http/http_handler.h"
+#include "olap/data_dir.h"
+#include "olap/tablet.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+// Migrate a tablet from a disk to another.
+class TabletMigrationAction : public HttpHandler {
+public:
+    TabletMigrationAction();
+    void handle(HttpRequest* req) override;
+    void _init_migration_action();
+    Status _execute_tablet_migration(TabletSharedPtr tablet, DataDir* dest_store);
+    Status _check_param(HttpRequest* req, int64_t& tablet_id, int32_t& schema_hash,
+                        string& dest_disk, string& goal);
+    Status _check_migrate_request(int64_t tablet_id, int32_t schema_hash, string dest_disk,
+                                  TabletSharedPtr& tablet, DataDir** dest_store);
+
+private:
+    std::unique_ptr<ThreadPool> _migration_thread_pool;
+
+    struct MigrationTask {
+        MigrationTask(int64_t tablet_id, int32_t schema_hash)
+                : _tablet_id(tablet_id), _schema_hash(schema_hash) {}
+
+        MigrationTask(int64_t tablet_id, int32_t schema_hash, std::string dest_disk)
+                : _tablet_id(tablet_id), _schema_hash(schema_hash), _dest_disk(dest_disk) {}
+
+        bool operator<(const MigrationTask& right) const {
+            if (_tablet_id != right._tablet_id) {
+                return _tablet_id < right._tablet_id;
+            } else if (_schema_hash != right._schema_hash) {
+                return _schema_hash < right._schema_hash;
+            } else {
+                return false;
+            }
+        }
+
+        std::string to_string() const {
+            std::stringstream ss;
+            ss << "MigrationTask: tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
+               << ", dest_disk=" << _dest_disk;
+            return ss.str();
+        }
+
+        int64_t _tablet_id;
+        int32_t _schema_hash;
+        std::string _dest_disk;
+    };
+
+    std::mutex _migration_status_mutex;
+    std::map<MigrationTask, std::string> _migration_tasks;
+    std::deque<std::pair<MigrationTask, Status>> _finished_migration_tasks;
+};
+} // namespace doris
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index a2e4302..2685c1e 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -29,6 +29,7 @@
 #include "http/action/snapshot_action.h"
 #include "http/action/stream_load.h"
 #include "http/action/tablets_distribution_action.h"
+#include "http/action/tablet_migration_action.h"
 #include "http/action/tablets_info_action.h"
 #include "http/action/update_config_action.h"
 #include "http/default_path_handlers.h"
@@ -93,6 +94,10 @@ Status HttpService::start() {
     TabletsDistributionAction* tablets_distribution_action = new TabletsDistributionAction();
     _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution", tablets_distribution_action);
 
+    // Register tablet migration action
+    TabletMigrationAction* tablet_migration_action = new TabletMigrationAction();
+    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration", tablet_migration_action);
+
     // register pprof actions
     PprofActions::setup(_env, _ev_http_server.get());
 
diff --git a/docs/en/administrator-guide/http-actions/tablet-migration-action.md b/docs/en/administrator-guide/http-actions/tablet-migration-action.md
new file mode 100644
index 0000000..7ecc552
--- /dev/null
+++ b/docs/en/administrator-guide/http-actions/tablet-migration-action.md
@@ -0,0 +1,89 @@
+---
+{
+    "title": "MIGRATE SINGLE TABLET TO A PARTICULAR DISK",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MIGRATE SINGLE TABLET TO A PARTICULAR DISK
+   
+Migrate single tablet to a particular disk.
+
+Submit the migration task:
+
+```
+curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=run&tablet_id=xxx&schema_hash=xxx&disk=xxx
+```
+
+The return is the submission result of the migration task:
+
+```
+    {
+        status: "Success",
+        msg: "migration task is successfully submitted."
+    }
+```
+
+or
+
+```
+    {
+        status: "Fail",
+        msg: "Migration task submission failed"
+    }
+```
+
+Show the status of migration task:
+
+```
+curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=status&tablet_id=xxx&schema_hash=xxx
+```
+
+The return is the execution result of the migration task:
+
+```
+    {
+        status: "Success",
+        msg: "migration task is running.",
+        dest_disk: "xxxxxx"
+    }
+```
+
+or
+
+```
+    {
+        status: "Success",
+        msg: "migration task has finished successfully.",
+        dest_disk: "xxxxxx"
+    }
+```
+
+or
+
+```
+    {
+        status: "Success",
+        msg: "migration task failed.",
+        dest_disk: "xxxxxx"
+    }
+```
\ No newline at end of file
diff --git a/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md b/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md
new file mode 100644
index 0000000..c2855b6
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/http-actions/tablet-migration-action.md
@@ -0,0 +1,87 @@
+---
+{
+    "title": "MIGRATE SINGLE TABLET TO A PARTICULAR DISK",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MIGRATE SINGLE TABLET TO A PARTICULAR DISK
+   
+在BE节点上迁移单个tablet到指定磁盘
+
+提交迁移任务:
+
+```
+curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=run&tablet_id=xxx&schema_hash=xxx&disk=xxx
+```
+
+返回值就是tablet迁移任务的提交结果:
+
+```
+    {
+        status: "Success",
+        msg: "migration task is successfully submitted."
+    }
+```
+或
+```
+    {
+        status: "Fail",
+        msg: "Migration task submission failed"
+    }
+```
+
+查询迁移任务状态:
+
+```
+curl -X GET http://be_host:webserver_port/api/tablet_migration?goal=status&tablet_id=xxx&schema_hash=xxx
+```
+
+返回值就是tablet迁移任务执行状态:
+
+```
+    {
+        status: "Success",
+        msg: "migration task is running",
+        dest_disk: "xxxxxx"
+    }
+```
+
+或
+
+```
+    {
+        status: "Success",
+        msg: "migration task has finished successfully",
+        dest_disk: "xxxxxx"
+    }
+```
+
+或
+
+```
+    {
+        status: "Success",
+        msg: "migration task failed.",
+        dest_disk: "xxxxxx"
+    }
+```
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org