You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/14 04:48:08 UTC

[doris] branch master updated: [feature-wip](file reader) Merge broker reader to the new file reader (#14980)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b8f93681eb [feature-wip](file reader) Merge broker reader to the new file reader (#14980)
b8f93681eb is described below

commit b8f93681eb07488263122f8ca408a0226193cdf2
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Wed Dec 14 12:48:02 2022 +0800

    [feature-wip](file reader) Merge broker reader to the new file reader (#14980)
    
    Currently, there are two sets of file readers in Doris, this pr rewrites the old broker reader with the new file reader.
    
    TODO:
    1. rewrite stream load pipe and kafka consumer pipe
---
 be/src/io/CMakeLists.txt                           |   2 +
 be/src/io/fs/broker_file_reader.cpp                | 129 +++++++++
 be/src/io/fs/broker_file_reader.h                  |  57 ++++
 be/src/io/fs/broker_file_system.cpp                | 308 +++++++++++++++++++++
 be/src/io/fs/broker_file_system.h                  |  74 +++++
 be/src/io/fs/file_system.h                         |   1 +
 be/src/io/fs/hdfs_file_reader.cpp                  |   2 +-
 be/src/io/fs/hdfs_file_system.cpp                  |   9 +-
 be/src/util/doris_metrics.cpp                      |   4 +
 be/src/util/doris_metrics.h                        |   2 +
 .../exec/format/file_reader/new_file_factory.cpp   |  18 ++
 .../vec/exec/format/file_reader/new_file_factory.h |   5 +
 12 files changed, 608 insertions(+), 3 deletions(-)

diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 402a90a800..e3125e946a 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -43,6 +43,8 @@ set(IO_FILES
     fs/s3_file_writer.cpp
     fs/hdfs_file_system.cpp
     fs/hdfs_file_reader.cpp
+    fs/broker_file_system.cpp
+    fs/broker_file_reader.cpp
     cache/dummy_file_cache.cpp
     cache/file_cache.cpp
     cache/file_cache_manager.cpp
diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp
new file mode 100644
index 0000000000..e8e54cb736
--- /dev/null
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/broker_file_reader.h"
+
+#include <gen_cpp/TPaloBrokerService.h>
+
+#include "common/status.h"
+#include "io/fs/broker_file_system.h"
+#include "util/doris_metrics.h"
+
+namespace doris {
+namespace io {
+
+BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
+                                   size_t file_size, TBrokerFD fd, BrokerFileSystem* fs)
+        : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(fs) {
+    DorisMetrics::instance()->broker_file_open_reading->increment(1);
+    DorisMetrics::instance()->broker_file_reader_total->increment(1);
+}
+
+BrokerFileReader::~BrokerFileReader() {
+    close();
+}
+
+Status BrokerFileReader::close() {
+    bool expected = false;
+    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
+        TBrokerCloseReaderRequest request;
+        request.__set_version(TBrokerVersion::VERSION_ONE);
+        request.__set_fd(_fd);
+
+        TBrokerOperationStatus response;
+        try {
+            std::shared_ptr<BrokerServiceConnection> client;
+            RETURN_IF_ERROR(_fs->get_client(&client));
+            try {
+                (*client)->closeReader(response, request);
+            } catch (apache::thrift::transport::TTransportException& e) {
+                std::this_thread::sleep_for(std::chrono::seconds(1));
+                RETURN_IF_ERROR((*client).reopen());
+                (*client)->closeReader(response, request);
+            }
+        } catch (apache::thrift::TException& e) {
+            std::stringstream ss;
+            ss << "Close broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
+            return Status::RpcError(ss.str());
+        }
+
+        if (response.statusCode != TBrokerOperationStatusCode::OK) {
+            std::stringstream ss;
+            ss << "close broker reader failed, broker:" << _broker_addr
+               << " failed:" << response.message;
+            return Status::InternalError(ss.str());
+        }
+
+        DorisMetrics::instance()->broker_file_open_reading->increment(-1);
+    }
+    return Status::OK();
+}
+
+Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/,
+                                 size_t* bytes_read) {
+    DCHECK(!closed());
+    size_t bytes_req = result.size;
+    char* to = result.data;
+    *bytes_read = 0;
+    if (UNLIKELY(bytes_req == 0)) {
+        return Status::OK();
+    }
+
+    TBrokerPReadRequest request;
+    request.__set_version(TBrokerVersion::VERSION_ONE);
+    request.__set_fd(_fd);
+    request.__set_offset(offset);
+    request.__set_length(bytes_req);
+
+    TBrokerReadResponse response;
+    std::shared_ptr<BrokerServiceConnection> client;
+    RETURN_IF_ERROR(_fs->get_client(&client));
+    try {
+        VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
+                 << ", read bytes length:" << bytes_req;
+        try {
+            (*client)->pread(response, request);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            RETURN_IF_ERROR((*client).reopen());
+            LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
+            (*client)->pread(response, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
+        return Status::RpcError(ss.str());
+    }
+
+    if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
+        // read the end of broker's file
+        *bytes_read = 0;
+        return Status::OK();
+    } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker:" << _broker_addr
+           << " failed:" << response.opStatus.message;
+        return Status::InternalError(ss.str());
+    }
+
+    *bytes_read = response.data.size();
+    memcpy(to, response.data.data(), *bytes_read);
+    return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h
new file mode 100644
index 0000000000..e0565dc656
--- /dev/null
+++ b/be/src/io/fs/broker_file_reader.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PaloBrokerService_types.h>
+
+#include "io/fs/file_reader.h"
+namespace doris {
+namespace io {
+
+class BrokerFileSystem;
+
+class BrokerFileReader : public FileReader {
+public:
+    BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size,
+                     TBrokerFD fd, BrokerFileSystem* fs);
+
+    ~BrokerFileReader() override;
+
+    Status close() override;
+
+    Status read_at(size_t offset, Slice result, const IOContext& io_ctx,
+                   size_t* bytes_read) override;
+
+    const Path& path() const override { return _path; }
+
+    size_t size() const override { return _file_size; }
+
+    bool closed() const override { return _closed.load(std::memory_order_acquire); }
+
+private:
+    const Path& _path;
+    size_t _file_size;
+
+    const TNetworkAddress& _broker_addr;
+    TBrokerFD _fd;
+
+    BrokerFileSystem* _fs;
+    std::atomic<bool> _closed = false;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp
new file mode 100644
index 0000000000..e68edb6253
--- /dev/null
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/broker_file_system.h"
+
+#include <gen_cpp/PaloBrokerService_types.h>
+#include <gen_cpp/TPaloBrokerService.h>
+
+#include "io/fs/broker_file_reader.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/storage_backend.h"
+
+namespace doris {
+namespace io {
+
+#ifdef BE_TEST
+inline BrokerServiceClientCache* client_cache() {
+    static BrokerServiceClientCache s_client_cache;
+    return &s_client_cache;
+}
+
+inline const std::string& client_id(const TNetworkAddress& addr) {
+    static std::string s_client_id = "doris_unit_test";
+    return s_client_id;
+}
+#else
+inline BrokerServiceClientCache* client_cache() {
+    return ExecEnv::GetInstance()->broker_client_cache();
+}
+
+inline const std::string& client_id(const TNetworkAddress& addr) {
+    return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr);
+}
+#endif
+
+#ifndef CHECK_BROKER_CLIENT
+#define CHECK_BROKER_CLIENT(client)                               \
+    if (!client) {                                                \
+        return Status::InternalError("init Broker client error"); \
+    }
+#endif
+
+BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr,
+                                   const std::map<std::string, std::string>& broker_prop)
+        : RemoteFileSystem("", "", FileSystemType::BROKER),
+          _broker_addr(broker_addr),
+          _broker_prop(broker_prop) {}
+
+Status BrokerFileSystem::connect() {
+    Status status = Status::OK();
+    _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr,
+                                              config::thrift_rpc_timeout_ms, &status));
+    if (!status.ok()) {
+        std::stringstream ss;
+        ss << "failed to get broker client. "
+           << "broker addr: " << _broker_addr << ". msg: " << status.get_error_msg();
+        status = Status::InternalError(ss.str());
+    }
+    return status;
+}
+
+Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+    CHECK_BROKER_CLIENT(_client);
+    TBrokerOpenReaderRequest request;
+    request.__set_version(TBrokerVersion::VERSION_ONE);
+    request.__set_path(path);
+    request.__set_startOffset(0);
+    request.__set_clientId(client_id(_broker_addr));
+    request.__set_properties(_broker_prop);
+
+    TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse();
+    Defer del_reponse {[&] { delete response; }};
+    try {
+        Status status;
+        try {
+            (*_client)->openReader(*response, request);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            RETURN_IF_ERROR((*_client).reopen());
+            (*_client)->openReader(*response, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker:" << _broker_addr << " failed: " << e.what();
+        return Status::RpcError(ss.str());
+    }
+
+    if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker: " << _broker_addr
+           << " failed: " << response->opStatus.message;
+        return Status::InternalError(ss.str());
+    }
+    // TODO(cmy): The file size is no longer got from openReader() method.
+    // But leave the code here for compatibility.
+    // This will be removed later.
+    size_t file_size = 0;
+    TBrokerFD fd;
+    if (response->__isset.size) {
+        file_size = response->size;
+    }
+    fd = response->fd;
+    *reader = std::make_shared<BrokerFileReader>(_broker_addr, path, file_size, fd, this);
+    return Status::OK();
+}
+
+Status BrokerFileSystem::delete_file(const Path& path) {
+    CHECK_BROKER_CLIENT(_client);
+    try {
+        // rm file from remote path
+        TBrokerDeletePathRequest del_req;
+        TBrokerOperationStatus del_rep;
+        del_req.__set_version(TBrokerVersion::VERSION_ONE);
+        del_req.__set_path(path);
+        del_req.__set_properties(_broker_prop);
+
+        try {
+            (*_client)->deletePath(del_rep, del_req);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR((*_client).reopen());
+            (*_client)->deletePath(del_rep, del_req);
+        }
+
+        if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
+            return Status::OK();
+        } else {
+            std::stringstream ss;
+            ss << "failed to delete from remote path: " << path << ", msg: " << del_rep.message;
+            return Status::InternalError(ss.str());
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "failed to delete file in remote path: " << path << ", msg: " << e.what();
+        return Status::RpcError(ss.str());
+    }
+}
+
+Status BrokerFileSystem::create_directory(const Path& /*path*/) {
+    return Status::NotSupported("create directory not implemented!");
+}
+
+// Delete all files under path.
+Status BrokerFileSystem::delete_directory(const Path& path) {
+    return delete_file(path);
+}
+
+Status BrokerFileSystem::exists(const Path& path, bool* res) const {
+    CHECK_BROKER_CLIENT(_client);
+    *res = false;
+    try {
+        TBrokerCheckPathExistRequest check_req;
+        TBrokerCheckPathExistResponse check_rep;
+        check_req.__set_version(TBrokerVersion::VERSION_ONE);
+        check_req.__set_path(path);
+        check_req.__set_properties(_broker_prop);
+
+        try {
+            (*_client)->checkPathExist(check_rep, check_req);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR((*_client).reopen());
+            (*_client)->checkPathExist(check_rep, check_req);
+        }
+
+        if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+            std::stringstream ss;
+            ss << "failed to check exist: " << path << ", msg: " << check_rep.opStatus.message;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        } else if (!check_rep.isPathExist) {
+            return Status::NotFound("{} not exists!", path.string());
+        } else {
+            *res = true;
+            return Status::OK();
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "failed to check exist: " << path << ", msg: " << e.what();
+        return Status::RpcError(ss.str());
+    }
+}
+
+Status BrokerFileSystem::file_size(const Path& path, size_t* file_size) const {
+    CHECK_BROKER_CLIENT(_client);
+    TBrokerOpenReaderRequest request;
+    request.__set_version(TBrokerVersion::VERSION_ONE);
+    request.__set_path(path);
+    request.__set_startOffset(0);
+    request.__set_clientId(client_id(_broker_addr));
+    request.__set_properties(_broker_prop);
+
+    TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse();
+    Defer del_reponse {[&] { delete response; }};
+    try {
+        Status status;
+        try {
+            (*_client)->openReader(*response, request);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            RETURN_IF_ERROR((*_client).reopen());
+            (*_client)->openReader(*response, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker: " << _broker_addr << " failed: " << e.what();
+        return Status::RpcError(ss.str());
+    }
+
+    if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+        std::stringstream ss;
+        ss << "Open broker reader failed, broker: " << _broker_addr
+           << " failed: " << response->opStatus.message;
+        return Status::RpcError(ss.str());
+    }
+    // TODO(cmy): The file size is no longer got from openReader() method.
+    // But leave the code here for compatibility.
+    // This will be removed later.
+    if (response->__isset.size) {
+        *file_size = response->size;
+    }
+    return Status::OK();
+}
+
+Status BrokerFileSystem::list(const Path& path, std::vector<Path>* files) {
+    CHECK_BROKER_CLIENT(_client);
+    Status status = Status::OK();
+    try {
+        // get existing files from remote path
+        TBrokerListResponse list_rep;
+        TBrokerListPathRequest list_req;
+        list_req.__set_version(TBrokerVersion::VERSION_ONE);
+        list_req.__set_path(path / "*");
+        list_req.__set_isRecursive(false);
+        list_req.__set_properties(_broker_prop);
+        list_req.__set_fileNameOnly(true); // we only need file name, not abs path
+
+        try {
+            (*_client)->listPath(list_rep, list_req);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            RETURN_IF_ERROR((*_client).reopen());
+            (*_client)->listPath(list_rep, list_req);
+        }
+
+        if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) {
+            LOG(INFO) << "path does not exist: " << path;
+            return Status::OK();
+        } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+            std::stringstream ss;
+            ss << "failed to list files from remote path: " << path
+               << ", msg: " << list_rep.opStatus.message;
+            return Status::InternalError(ss.str());
+        }
+        LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size();
+
+        // split file name and checksum
+        for (const auto& file : list_rep.files) {
+            if (file.isDir) {
+                // this is not a file
+                continue;
+            }
+
+            const std::string& file_name = file.path;
+            size_t pos = file_name.find_last_of('.');
+            if (pos == std::string::npos || pos == file_name.size() - 1) {
+                // Not found checksum separator, ignore this file
+                continue;
+            }
+
+            FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1),
+                             file.size};
+            files->emplace_back(std::string(file_name, 0, pos));
+            VLOG(2) << "split remote file: " << std::string(file_name, 0, pos)
+                    << ", checksum: " << std::string(file_name, pos + 1);
+        }
+
+        LOG(INFO) << "finished to split files. valid file num: " << files->size();
+
+    } catch (apache::thrift::TException& e) {
+        std::stringstream ss;
+        ss << "failed to list files in remote path: " << path << ", msg: " << e.what();
+        return Status::RpcError(ss.str());
+    }
+    return status;
+}
+
+Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>* client) const {
+    CHECK_BROKER_CLIENT(_client);
+    *client = _client;
+    return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h
new file mode 100644
index 0000000000..5b6cce33ba
--- /dev/null
+++ b/be/src/io/fs/broker_file_system.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/remote_file_system.h"
+#include "runtime/client_cache.h"
+namespace doris {
+
+namespace io {
+class BrokerFileSystem final : public RemoteFileSystem {
+public:
+    BrokerFileSystem(const TNetworkAddress& broker_addr,
+                     const std::map<std::string, std::string>& broker_prop);
+    ~BrokerFileSystem() override = default;
+
+    Status create_file(const Path& /*path*/, FileWriterPtr* /*writer*/) override {
+        return Status::NotSupported("Currently not support to create file through broker.");
+    }
+
+    Status open_file(const Path& path, FileReaderSPtr* reader) override;
+
+    Status delete_file(const Path& path) override;
+
+    Status create_directory(const Path& path) override;
+
+    // Delete all files under path.
+    Status delete_directory(const Path& path) override;
+
+    Status link_file(const Path& /*src*/, const Path& /*dest*/) override {
+        return Status::NotSupported("Not supported link file through broker.");
+    }
+
+    Status exists(const Path& path, bool* res) const override;
+
+    Status file_size(const Path& path, size_t* file_size) const override;
+
+    Status list(const Path& path, std::vector<Path>* files) override;
+
+    Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) override {
+        return Status::NotSupported("Currently not support to upload file to HDFS");
+    }
+
+    Status batch_upload(const std::vector<Path>& /*local_paths*/,
+                        const std::vector<Path>& /*dest_paths*/) override {
+        return Status::NotSupported("Currently not support to batch upload file to HDFS");
+    }
+
+    Status connect() override;
+
+    Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const;
+
+private:
+    const TNetworkAddress& _broker_addr;
+    const std::map<std::string, std::string>& _broker_prop;
+
+    std::shared_ptr<BrokerServiceConnection> _client;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index d97148f4c5..2dde2a64e7 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -38,6 +38,7 @@ enum class FileSystemType : uint8_t {
     LOCAL,
     S3,
     HDFS,
+    BROKER,
 };
 
 class FileSystem {
diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp
index 1b77e64238..ef03541387 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -64,8 +64,8 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*i
     size_t bytes_req = result.size;
     char* to = result.data;
     bytes_req = std::min(bytes_req, _file_size - offset);
+    *bytes_read = 0;
     if (UNLIKELY(bytes_req == 0)) {
-        *bytes_read = 0;
         return Status::OK();
     }
 
diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp
index d219e0393a..cafa7ca34c 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -96,7 +96,7 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer
     // }
     // hdfsCloseFile(handle->hdfs_fs, hdfs_file);
     // return Status::OK();
-    return Status::NotSupported("Currently not support to upload file to HDFS");
+    return Status::NotSupported("Currently not support to create file to HDFS");
 }
 
 Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
@@ -161,7 +161,12 @@ Status HdfsFileSystem::delete_directory(const Path& path) {
 
 Status HdfsFileSystem::exists(const Path& path, bool* res) const {
     CHECK_HDFS_HANDLE(_fs_handle);
-    *res = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+    int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+    if (is_exists == 0) {
+        *res = true;
+    } else {
+        *res = false;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index d3bebc7eb0..e5dad525fb 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -171,6 +171,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hdfs_file_reader_total, MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(broker_file_reader_total, MetricUnit::FILESYSTEM);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM);
@@ -183,6 +184,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYS
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hdfs_file_open_reading, MetricUnit::FILESYSTEM);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_file_open_reading, MetricUnit::FILESYSTEM);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM);
 
@@ -301,6 +303,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, broker_file_reader_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total);
@@ -312,6 +315,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading);
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading);
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, hdfs_file_open_reading);
+    INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading);
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);
 }
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index e59009a99c..a43858d055 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -155,6 +155,7 @@ public:
     IntCounter* local_file_reader_total;
     IntCounter* s3_file_reader_total;
     IntCounter* hdfs_file_reader_total;
+    IntCounter* broker_file_reader_total;
     IntCounter* local_file_writer_total;
     IntCounter* s3_file_writer_total;
     IntCounter* file_created_total;
@@ -166,6 +167,7 @@ public:
     IntGauge* local_file_open_reading;
     IntGauge* s3_file_open_reading;
     IntGauge* hdfs_file_open_reading;
+    IntGauge* broker_file_open_reading;
     IntGauge* local_file_open_writing;
     IntGauge* s3_file_open_writing;
 
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
index 23d6fa4eab..b8f2252885 100644
--- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp
+++ b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
@@ -20,6 +20,7 @@
 #include "io/broker_reader.h"
 #include "io/broker_writer.h"
 #include "io/buffered_reader.h"
+#include "io/fs/broker_file_system.h"
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/s3_file_system.h"
@@ -126,6 +127,12 @@ Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/,
                                            &file_system_ptr, file_reader));
         break;
     }
+    case TFileType::FILE_BROKER: {
+        RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
+                                             system_properties.properties, file_description.path,
+                                             &file_system_ptr, file_reader));
+        break;
+    }
     default:
         return Status::NotSupported("unsupported file reader type: {}", std::to_string(type));
     }
@@ -179,4 +186,15 @@ Status NewFileFactory::create_s3_reader(const std::map<std::string, std::string>
     (*s3_file_system)->open_file(s3_uri.get_key(), reader);
     return Status::OK();
 }
+
+Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
+                                            const std::map<std::string, std::string>& prop,
+                                            const std::string& path,
+                                            io::FileSystem** broker_file_system,
+                                            io::FileReaderSPtr* reader) {
+    *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
+    (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect();
+    (*broker_file_system)->open_file(path, reader);
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h b/be/src/vec/exec/format/file_reader/new_file_factory.h
index 82fc1fe6cf..8167618859 100644
--- a/be/src/vec/exec/format/file_reader/new_file_factory.h
+++ b/be/src/vec/exec/format/file_reader/new_file_factory.h
@@ -92,6 +92,11 @@ public:
                                    const std::string& path, io::FileSystem** s3_file_system,
                                    io::FileReaderSPtr* reader);
 
+    static Status create_broker_reader(const TNetworkAddress& broker_addr,
+                                       const std::map<std::string, std::string>& prop,
+                                       const std::string& path, io::FileSystem** hdfs_file_system,
+                                       io::FileReaderSPtr* reader);
+
     static TFileType::type convert_storage_type(TStorageBackendType::type type) {
         switch (type) {
         case TStorageBackendType::LOCAL:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org