You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/11 09:52:25 UTC

(doris) branch master updated: [feature](merge-cloud) Add cloud meta manager skeleton (#29833)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 984ffa00fb4 [feature](merge-cloud) Add cloud meta manager skeleton (#29833)
984ffa00fb4 is described below

commit 984ffa00fb41615e77f34c1239a237b361276560
Author: walter <w4...@gmail.com>
AuthorDate: Thu Jan 11 17:52:17 2024 +0800

    [feature](merge-cloud) Add cloud meta manager skeleton (#29833)
    
    Co-authored-by: plat1ko <pl...@gmail.com>
    Co-authored-by: Gavin Chou <ga...@gmail.com>
    Co-authored-by: Xin Liao <li...@126.com>
    Co-authored-by: Xiaocc <59...@qq.com>
    Co-authored-by: deardeng <56...@qq.com>
    Co-authored-by: Lei Zhang <27...@users.noreply.github.com>
    Co-authored-by: Lightman <31...@users.noreply.github.com>
    Co-authored-by: Luwei <81...@qq.com>
    Co-authored-by: Yongqiang YANG <da...@gmail.com>
    Co-authored-by: YueW <45...@users.noreply.github.com>
    Co-authored-by: bobhan1 <bh...@outlook.com>
---
 be/src/cloud/cloud_meta_mgr.cpp | 287 ++++++++++++++++++++++++++++++++++++++++
 be/src/cloud/cloud_meta_mgr.h   |  76 +++++++++++
 be/src/cloud/config.cpp         |   9 ++
 be/src/cloud/config.h           |  14 ++
 be/src/cloud/meta_mgr.h         |  86 ++++++++++++
 be/src/util/network_util.cpp    |  39 ++++++
 be/src/util/network_util.h      |   2 +
 7 files changed, 513 insertions(+)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
new file mode 100644
index 00000000000..f2fcf5132f6
--- /dev/null
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "cloud/cloud_meta_mgr.h"
+
+#include <brpc/channel.h>
+#include <brpc/controller.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <vector>
+
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gen_cpp/cloud.pb.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "util/network_util.h"
+#include "util/s3_util.h"
+
+namespace doris::cloud {
+using namespace ErrorCode;
+
+bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr", "get_rowset");
+
+class MetaServiceProxy {
+public:
+    static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
+        SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", Status::OK(), stub);
+        return get_pooled_client(stub);
+    }
+
+private:
+    static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
+        static std::once_flag proxies_flag;
+        static size_t num_proxies = 1;
+        static std::atomic<size_t> index(0);
+        static std::unique_ptr<MetaServiceProxy[]> proxies;
+
+        std::call_once(
+                proxies_flag, +[]() {
+                    if (config::meta_service_connection_pooled) {
+                        num_proxies = config::meta_service_connection_pool_size;
+                    }
+                    proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
+                });
+
+        for (size_t i = 0; i + 1 < num_proxies; ++i) {
+            size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
+            Status s = proxies[next_index].get(stub);
+            if (s.ok()) return Status::OK();
+        }
+
+        size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
+        return proxies[next_index].get(stub);
+    }
+
+    static Status init_channel(brpc::Channel* channel) {
+        static std::atomic<size_t> index = 1;
+
+        std::string ip;
+        uint16_t port;
+        Status s = get_meta_service_ip_and_port(&ip, &port);
+        if (!s.ok()) {
+            LOG(WARNING) << "fail to get meta service ip and port: " << s;
+            return s;
+        }
+
+        size_t next_id = index.fetch_add(1, std::memory_order_relaxed);
+        brpc::ChannelOptions options;
+        options.connection_group = fmt::format("ms_{}", next_id);
+        if (channel->Init(ip.c_str(), port, &options) != 0) {
+            return Status::InternalError("fail to init brpc channel, ip: {}, port: {}", ip, port);
+        }
+        return Status::OK();
+    }
+
+    static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) {
+        std::string parsed_host;
+        if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) {
+            return Status::InvalidArgument("invalid meta service endpoint: {}",
+                                           config::meta_service_endpoint);
+        }
+        if (is_valid_ip(parsed_host)) {
+            *ip = std::move(parsed_host);
+            return Status::OK();
+        }
+        return hostname_to_ip(parsed_host, *ip);
+    }
+
+    bool is_idle_timeout(long now) {
+        auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
+        return idle_timeout_ms > 0 &&
+               _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
+    }
+
+    Status get(std::shared_ptr<MetaService_Stub>* stub) {
+        using namespace std::chrono;
+
+        auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+        {
+            std::shared_lock lock(_mutex);
+            if (_deadline_ms >= now && !is_idle_timeout(now)) {
+                _last_access_at_ms.store(now, std::memory_order_relaxed);
+                *stub = _stub;
+                return Status::OK();
+            }
+        }
+
+        auto channel = std::make_unique<brpc::Channel>();
+        Status s = init_channel(channel.get());
+        if (UNLIKELY(!s.ok())) {
+            return s;
+        }
+
+        *stub = std::make_shared<MetaService_Stub>(channel.release(),
+                                                   google::protobuf::Service::STUB_OWNS_CHANNEL);
+
+        long deadline = now;
+        if (config::meta_service_connection_age_base_minutes > 0) {
+            std::default_random_engine rng(static_cast<uint32_t>(now));
+            std::uniform_int_distribution<> uni(
+                    config::meta_service_connection_age_base_minutes,
+                    config::meta_service_connection_age_base_minutes * 2);
+            deadline = now + duration_cast<milliseconds>(minutes(uni(rng))).count();
+        } else {
+            deadline = LONG_MAX;
+        }
+
+        // Last one WIN
+        std::unique_lock lock(_mutex);
+        _last_access_at_ms.store(now, std::memory_order_relaxed);
+        _deadline_ms = deadline;
+        _stub = *stub;
+        return Status::OK();
+    }
+
+    std::shared_mutex _mutex;
+    std::atomic<long> _last_access_at_ms {0};
+    long _deadline_ms {0};
+    std::shared_ptr<MetaService_Stub> _stub;
+};
+
+Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
+    VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id,
+                                      tablet_meta);
+
+    std::shared_ptr<MetaService_Stub> stub;
+    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+    int tried = 0;
+    while (true) {
+        brpc::Controller cntl;
+        cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
+        GetTabletRequest req;
+        GetTabletResponse resp;
+        req.set_cloud_unique_id(config::cloud_unique_id);
+        req.set_tablet_id(tablet_id);
+        stub->get_tablet(&cntl, &req, &resp, nullptr);
+        int retry_times = config::meta_service_rpc_retry_times;
+        if (cntl.Failed()) {
+            if (tried++ < retry_times) {
+                auto rng = std::default_random_engine(static_cast<uint32_t>(
+                        std::chrono::steady_clock::now().time_since_epoch().count()));
+                std::uniform_int_distribution<uint32_t> u(20, 200);
+                std::uniform_int_distribution<uint32_t> u1(500, 1000);
+                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
+                std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+                LOG_INFO("failed to get tablet meta")
+                        .tag("reason", cntl.ErrorText())
+                        .tag("tablet_id", tablet_id)
+                        .tag("tried", tried)
+                        .tag("sleep", duration_ms);
+                continue;
+            }
+            return Status::RpcError("failed to get tablet meta: {}", cntl.ErrorText());
+        }
+        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+            return Status::NotFound("failed to get tablet meta: {}", resp.status().msg());
+        }
+        if (resp.status().code() != MetaServiceCode::OK) {
+            return Status::InternalError("failed to get tablet meta: {}", resp.status().msg());
+        }
+        *tablet_meta = std::make_shared<TabletMeta>();
+        (*tablet_meta)->init_from_pb(resp.tablet_meta());
+        VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id();
+        return Status::OK();
+    }
+}
+
+Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data) {
+    return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not implemented");
+}
+
+Status CloudMetaMgr::sync_tablet_delete_bitmap(
+        Tablet* tablet, int64_t old_max_version,
+        const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+        const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap) {
+    return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented");
+}
+
+Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                                    RowsetMetaSharedPtr* existed_rs_meta) {
+    return Status::NotSupported("CloudMetaMgr::prepare_rowset is not implemented");
+}
+
+Status CloudMetaMgr::commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                                   RowsetMetaSharedPtr* existed_rs_meta) {
+    return Status::NotSupported("CloudMetaMgr::commit_rowset is not implemented");
+}
+
+Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
+    return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented");
+}
+
+Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) {
+    return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented");
+}
+
+Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) {
+    return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented");
+}
+
+Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) {
+    return Status::NotSupported("CloudMetaMgr::precommit_txn is not implemented");
+}
+
+Status CloudMetaMgr::get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) {
+    return Status::NotSupported("CloudMetaMgr::get_s3_info is not implemented");
+}
+
+Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) {
+    return Status::NotSupported("CloudMetaMgr::prepare_tablet_job is not implemented");
+}
+
+Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) {
+    return Status::NotSupported("CloudMetaMgr::commit_tablet_job is not implemented");
+}
+
+Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
+    return Status::NotSupported("CloudMetaMgr::alter_tablet_job is not implemented");
+}
+
+Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
+    return Status::NotSupported("CloudMetaMgr::lease_tablet_job is not implemented");
+}
+
+Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) {
+    return Status::NotSupported("CloudMetaMgr::update_tablet_schema is not implemented");
+}
+
+Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator,
+                                          DeleteBitmap* delete_bitmap) {
+    return Status::NotSupported("CloudMetaMgr::update_delete_bitmap is not implemented");
+}
+
+Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id,
+                                                   int64_t initiator) {
+    return Status::NotSupported("CloudMetaMgr::get_delete_bitmap_update_lock is not implemented");
+}
+
+} // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
new file mode 100644
index 00000000000..fe65e0441ff
--- /dev/null
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "cloud/meta_mgr.h"
+#include "olap/rowset/rowset_meta.h"
+
+namespace doris::cloud {
+class TabletStatsPB;
+class TabletIndexPB;
+
+class CloudMetaMgr final : public MetaMgr {
+public:
+    CloudMetaMgr() = default;
+    ~CloudMetaMgr() override = default;
+    CloudMetaMgr(const CloudMetaMgr&) = delete;
+    CloudMetaMgr& operator=(const CloudMetaMgr&) = delete;
+
+    Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta) override;
+
+    Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false) override;
+
+    Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                          std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) override;
+
+    Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                         std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) override;
+
+    Status update_tmp_rowset(const RowsetMeta& rs_meta) override;
+
+    Status commit_txn(StreamLoadContext* ctx, bool is_2pc) override;
+
+    Status abort_txn(StreamLoadContext* ctx) override;
+
+    Status precommit_txn(StreamLoadContext* ctx) override;
+
+    Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) override;
+
+    Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) override;
+
+    Status commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) override;
+
+    Status abort_tablet_job(const TabletJobInfoPB& job) override;
+
+    Status lease_tablet_job(const TabletJobInfoPB& job) override;
+
+    Status update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) override;
+
+    Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator,
+                                DeleteBitmap* delete_bitmap) override;
+
+    Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id,
+                                         int64_t initiator) override;
+
+private:
+    Status sync_tablet_delete_bitmap(
+            Tablet* tablet, int64_t old_max_version,
+            const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
+            const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
+};
+
+} // namespace doris::cloud
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 12a217dfcd0..d7513574037 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -20,5 +20,14 @@
 namespace doris::config {
 
 DEFINE_String(cloud_unique_id, "");
+DEFINE_String(meta_service_endpoint, "");
+DEFINE_Bool(meta_service_use_load_balancer, "false");
+DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
+DEFINE_Bool(meta_service_connection_pooled, "true");
+DEFINE_mInt64(meta_service_connection_pool_size, "20");
+DEFINE_mInt32(meta_service_connection_age_base_minutes, "5");
+DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0");
+DEFINE_mInt32(meta_service_rpc_retry_times, "200");
+DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000");
 
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 0044ab11458..0a2ceab3e5a 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -27,4 +27,18 @@ static inline bool is_cloud_mode() {
     return !cloud_unique_id.empty();
 }
 
+DECLARE_String(meta_service_endpoint);
+// Set the underlying connection type to pooled.
+DECLARE_Bool(meta_service_connection_pooled);
+DECLARE_mInt64(meta_service_connection_pool_size);
+// A connection will expire after a random time during [base, 2*base], so that the BE
+// has a chance to connect to a new RS. Set zero to disable it.
+DECLARE_mInt32(meta_service_connection_age_base_minutes);
+// Rebuild the idle connections after the timeout exceeds. Set zero to disable it.
+DECLARE_mInt32(meta_service_idle_connection_timeout_ms);
+DECLARE_mInt32(meta_service_rpc_timeout_ms);
+DECLARE_mInt32(meta_service_rpc_retry_times);
+// default brpc timeout
+DECLARE_mInt32(meta_service_brpc_timeout_ms);
+
 } // namespace doris::config
diff --git a/be/src/cloud/meta_mgr.h b/be/src/cloud/meta_mgr.h
new file mode 100644
index 00000000000..c573d43ff76
--- /dev/null
+++ b/be/src/cloud/meta_mgr.h
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "common/status.h"
+#include "util/s3_util.h"
+
+namespace doris {
+class StreamLoadContext;
+class Tablet;
+class TabletMeta;
+class RowsetMeta;
+class TabletSchema;
+class DeleteBitmap;
+
+namespace cloud {
+
+class TabletJobInfoPB;
+class StartTabletJobResponse;
+class FinishTabletJobResponse;
+
+class MetaMgr {
+public:
+    virtual ~MetaMgr() = default;
+
+    virtual Status open() { return Status::OK(); }
+
+    virtual Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta) = 0;
+
+    // If `warmup_delta_data` is true, download the new version rowset data in background
+    virtual Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false) = 0;
+
+    virtual Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                                  std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) = 0;
+
+    virtual Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+                                 std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr) = 0;
+
+    virtual Status update_tmp_rowset(const RowsetMeta& rs_meta) = 0;
+
+    virtual Status commit_txn(StreamLoadContext* ctx, bool is_2pc) = 0;
+
+    virtual Status abort_txn(StreamLoadContext* ctx) = 0;
+
+    virtual Status precommit_txn(StreamLoadContext* ctx) = 0;
+
+    virtual Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) = 0;
+
+    virtual Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) = 0;
+
+    virtual Status commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) = 0;
+
+    virtual Status abort_tablet_job(const TabletJobInfoPB& job) = 0;
+
+    virtual Status lease_tablet_job(const TabletJobInfoPB& job) = 0;
+
+    virtual Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t initiator,
+                                        DeleteBitmap* delete_bitmap) = 0;
+
+    virtual Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id,
+                                                 int64_t initiator) = 0;
+
+    virtual Status update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) = 0;
+};
+
+} // namespace cloud
+} // namespace doris
diff --git a/be/src/util/network_util.cpp b/be/src/util/network_util.cpp
index e7953c341d4..3d93c2e183f 100644
--- a/be/src/util/network_util.cpp
+++ b/be/src/util/network_util.cpp
@@ -78,6 +78,45 @@ bool is_valid_ip(const std::string& ip) {
     return (inet_pton(AF_INET6, ip.data(), buf) > 0) || (inet_pton(AF_INET, ip.data(), buf) > 0);
 }
 
+bool parse_endpoint(const std::string& endpoint, std::string* host, uint16_t* port) {
+    auto p = endpoint.find_last_of(':');
+    if (p == std::string::npos || p + 1 == endpoint.size()) {
+        return false;
+    }
+
+    const char* port_base = endpoint.c_str() + p + 1;
+    char* end = nullptr;
+    long value = strtol(port_base, &end, 10);
+    if (port_base == end) {
+        return false;
+    } else if (*end) {
+        while (std::isspace(*end)) {
+            end++;
+        }
+        if (*end) {
+            return false;
+        }
+    } else if (value < 0 || 65535 < value) {
+        return false;
+    }
+
+    std::string::size_type i = 0;
+    const char* host_base = endpoint.c_str();
+    while (std::isspace(host_base[i])) {
+        i++;
+    }
+    if (i < p && host_base[i] == '[' && host_base[p - 1] == ']') {
+        i += 1;
+        p -= 1;
+    }
+    if (i >= p) {
+        return false;
+    }
+    *host = endpoint.substr(i, p - i);
+    *port = value;
+    return true;
+}
+
 Status hostname_to_ip(const std::string& host, std::string& ip) {
     auto start = std::chrono::high_resolution_clock::now();
     Status status = hostname_to_ipv4(host, ip);
diff --git a/be/src/util/network_util.h b/be/src/util/network_util.h
index a9541586ef5..fe8864bd1bd 100644
--- a/be/src/util/network_util.h
+++ b/be/src/util/network_util.h
@@ -45,6 +45,8 @@ private:
 
 bool is_valid_ip(const std::string& ip);
 
+bool parse_endpoint(const std::string& endpoint, std::string* host, uint16_t* port);
+
 Status hostname_to_ip(const std::string& host, std::string& ip);
 
 Status hostname_to_ipv4(const std::string& host, std::string& ip);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org