You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/14 04:48:08 UTC
[doris] branch master updated: [feature-wip](file reader) Merge broker reader to the new file reader (#14980)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b8f93681eb [feature-wip](file reader) Merge broker reader to the new file reader (#14980)
b8f93681eb is described below
commit b8f93681eb07488263122f8ca408a0226193cdf2
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Wed Dec 14 12:48:02 2022 +0800
[feature-wip](file reader) Merge broker reader to the new file reader (#14980)
Currently, there are two sets of file readers in Doris, this pr rewrites the old broker reader with the new file reader.
TODO:
1. rewrite stream load pipe and kafka consumer pipe
---
be/src/io/CMakeLists.txt | 2 +
be/src/io/fs/broker_file_reader.cpp | 129 +++++++++
be/src/io/fs/broker_file_reader.h | 57 ++++
be/src/io/fs/broker_file_system.cpp | 308 +++++++++++++++++++++
be/src/io/fs/broker_file_system.h | 74 +++++
be/src/io/fs/file_system.h | 1 +
be/src/io/fs/hdfs_file_reader.cpp | 2 +-
be/src/io/fs/hdfs_file_system.cpp | 9 +-
be/src/util/doris_metrics.cpp | 4 +
be/src/util/doris_metrics.h | 2 +
.../exec/format/file_reader/new_file_factory.cpp | 18 ++
.../vec/exec/format/file_reader/new_file_factory.h | 5 +
12 files changed, 608 insertions(+), 3 deletions(-)
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 402a90a800..e3125e946a 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -43,6 +43,8 @@ set(IO_FILES
fs/s3_file_writer.cpp
fs/hdfs_file_system.cpp
fs/hdfs_file_reader.cpp
+ fs/broker_file_system.cpp
+ fs/broker_file_reader.cpp
cache/dummy_file_cache.cpp
cache/file_cache.cpp
cache/file_cache_manager.cpp
diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp
new file mode 100644
index 0000000000..e8e54cb736
--- /dev/null
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/broker_file_reader.h"
+
+#include <gen_cpp/TPaloBrokerService.h>
+
+#include "common/status.h"
+#include "io/fs/broker_file_system.h"
+#include "util/doris_metrics.h"
+
+namespace doris {
+namespace io {
+
+BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
+ size_t file_size, TBrokerFD fd, BrokerFileSystem* fs)
+ : _path(path), _file_size(file_size), _broker_addr(broker_addr), _fd(fd), _fs(fs) {
+ DorisMetrics::instance()->broker_file_open_reading->increment(1);
+ DorisMetrics::instance()->broker_file_reader_total->increment(1);
+}
+
+BrokerFileReader::~BrokerFileReader() {
+ close();
+}
+
+Status BrokerFileReader::close() {
+ bool expected = false;
+ if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
+ TBrokerCloseReaderRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(_fd);
+
+ TBrokerOperationStatus response;
+ try {
+ std::shared_ptr<BrokerServiceConnection> client;
+ RETURN_IF_ERROR(_fs->get_client(&client));
+ try {
+ (*client)->closeReader(response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*client).reopen());
+ (*client)->closeReader(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "Close broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "close broker reader failed, broker:" << _broker_addr
+ << " failed:" << response.message;
+ return Status::InternalError(ss.str());
+ }
+
+ DorisMetrics::instance()->broker_file_open_reading->increment(-1);
+ }
+ return Status::OK();
+}
+
+Status BrokerFileReader::read_at(size_t offset, Slice result, const IOContext& /*io_ctx*/,
+ size_t* bytes_read) {
+ DCHECK(!closed());
+ size_t bytes_req = result.size;
+ char* to = result.data;
+ *bytes_read = 0;
+ if (UNLIKELY(bytes_req == 0)) {
+ return Status::OK();
+ }
+
+ TBrokerPReadRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(_fd);
+ request.__set_offset(offset);
+ request.__set_length(bytes_req);
+
+ TBrokerReadResponse response;
+ std::shared_ptr<BrokerServiceConnection> client;
+ RETURN_IF_ERROR(_fs->get_client(&client));
+ try {
+ VLOG_RPC << "send pread request to broker:" << _broker_addr << " position:" << offset
+ << ", read bytes length:" << bytes_req;
+ try {
+ (*client)->pread(response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*client).reopen());
+ LOG(INFO) << "retry reading from broker: " << _broker_addr << ". reason: " << e.what();
+ (*client)->pread(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker:" << _broker_addr << " failed:" << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.opStatus.statusCode == TBrokerOperationStatusCode::END_OF_FILE) {
+ // read the end of broker's file
+ *bytes_read = 0;
+ return Status::OK();
+ } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker:" << _broker_addr
+ << " failed:" << response.opStatus.message;
+ return Status::InternalError(ss.str());
+ }
+
+ *bytes_read = response.data.size();
+ memcpy(to, response.data.data(), *bytes_read);
+ return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h
new file mode 100644
index 0000000000..e0565dc656
--- /dev/null
+++ b/be/src/io/fs/broker_file_reader.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PaloBrokerService_types.h>
+
+#include "io/fs/file_reader.h"
+namespace doris {
+namespace io {
+
+class BrokerFileSystem;
+
+class BrokerFileReader : public FileReader {
+public:
+ BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path, size_t file_size,
+ TBrokerFD fd, BrokerFileSystem* fs);
+
+ ~BrokerFileReader() override;
+
+ Status close() override;
+
+ Status read_at(size_t offset, Slice result, const IOContext& io_ctx,
+ size_t* bytes_read) override;
+
+ const Path& path() const override { return _path; }
+
+ size_t size() const override { return _file_size; }
+
+ bool closed() const override { return _closed.load(std::memory_order_acquire); }
+
+private:
+ const Path& _path;
+ size_t _file_size;
+
+ const TNetworkAddress& _broker_addr;
+ TBrokerFD _fd;
+
+ BrokerFileSystem* _fs;
+ std::atomic<bool> _closed = false;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp
new file mode 100644
index 0000000000..e68edb6253
--- /dev/null
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/broker_file_system.h"
+
+#include <gen_cpp/PaloBrokerService_types.h>
+#include <gen_cpp/TPaloBrokerService.h>
+
+#include "io/fs/broker_file_reader.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/storage_backend.h"
+
+namespace doris {
+namespace io {
+
+#ifdef BE_TEST
+inline BrokerServiceClientCache* client_cache() {
+ static BrokerServiceClientCache s_client_cache;
+ return &s_client_cache;
+}
+
+inline const std::string& client_id(const TNetworkAddress& addr) {
+ static std::string s_client_id = "doris_unit_test";
+ return s_client_id;
+}
+#else
+inline BrokerServiceClientCache* client_cache() {
+ return ExecEnv::GetInstance()->broker_client_cache();
+}
+
+inline const std::string& client_id(const TNetworkAddress& addr) {
+ return ExecEnv::GetInstance()->broker_mgr()->get_client_id(addr);
+}
+#endif
+
+#ifndef CHECK_BROKER_CLIENT
+#define CHECK_BROKER_CLIENT(client) \
+ if (!client) { \
+ return Status::InternalError("init Broker client error"); \
+ }
+#endif
+
+BrokerFileSystem::BrokerFileSystem(const TNetworkAddress& broker_addr,
+ const std::map<std::string, std::string>& broker_prop)
+ : RemoteFileSystem("", "", FileSystemType::BROKER),
+ _broker_addr(broker_addr),
+ _broker_prop(broker_prop) {}
+
+Status BrokerFileSystem::connect() {
+ Status status = Status::OK();
+ _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr,
+ config::thrift_rpc_timeout_ms, &status));
+ if (!status.ok()) {
+ std::stringstream ss;
+ ss << "failed to get broker client. "
+ << "broker addr: " << _broker_addr << ". msg: " << status.get_error_msg();
+ status = Status::InternalError(ss.str());
+ }
+ return status;
+}
+
+Status BrokerFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
+ CHECK_BROKER_CLIENT(_client);
+ TBrokerOpenReaderRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_path(path);
+ request.__set_startOffset(0);
+ request.__set_clientId(client_id(_broker_addr));
+ request.__set_properties(_broker_prop);
+
+ TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse();
+ Defer del_reponse {[&] { delete response; }};
+ try {
+ Status status;
+ try {
+ (*_client)->openReader(*response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_client).reopen());
+ (*_client)->openReader(*response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker:" << _broker_addr << " failed: " << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker: " << _broker_addr
+ << " failed: " << response->opStatus.message;
+ return Status::InternalError(ss.str());
+ }
+ // TODO(cmy): The file size is no longer got from openReader() method.
+ // But leave the code here for compatibility.
+ // This will be removed later.
+ size_t file_size = 0;
+ TBrokerFD fd;
+ if (response->__isset.size) {
+ file_size = response->size;
+ }
+ fd = response->fd;
+ *reader = std::make_shared<BrokerFileReader>(_broker_addr, path, file_size, fd, this);
+ return Status::OK();
+}
+
+Status BrokerFileSystem::delete_file(const Path& path) {
+ CHECK_BROKER_CLIENT(_client);
+ try {
+ // rm file from remote path
+ TBrokerDeletePathRequest del_req;
+ TBrokerOperationStatus del_rep;
+ del_req.__set_version(TBrokerVersion::VERSION_ONE);
+ del_req.__set_path(path);
+ del_req.__set_properties(_broker_prop);
+
+ try {
+ (*_client)->deletePath(del_rep, del_req);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ RETURN_IF_ERROR((*_client).reopen());
+ (*_client)->deletePath(del_rep, del_req);
+ }
+
+ if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
+ return Status::OK();
+ } else {
+ std::stringstream ss;
+ ss << "failed to delete from remote path: " << path << ", msg: " << del_rep.message;
+ return Status::InternalError(ss.str());
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "failed to delete file in remote path: " << path << ", msg: " << e.what();
+ return Status::RpcError(ss.str());
+ }
+}
+
+Status BrokerFileSystem::create_directory(const Path& /*path*/) {
+ return Status::NotSupported("create directory not implemented!");
+}
+
+// Delete all files under path.
+Status BrokerFileSystem::delete_directory(const Path& path) {
+ return delete_file(path);
+}
+
+Status BrokerFileSystem::exists(const Path& path, bool* res) const {
+ CHECK_BROKER_CLIENT(_client);
+ *res = false;
+ try {
+ TBrokerCheckPathExistRequest check_req;
+ TBrokerCheckPathExistResponse check_rep;
+ check_req.__set_version(TBrokerVersion::VERSION_ONE);
+ check_req.__set_path(path);
+ check_req.__set_properties(_broker_prop);
+
+ try {
+ (*_client)->checkPathExist(check_rep, check_req);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ RETURN_IF_ERROR((*_client).reopen());
+ (*_client)->checkPathExist(check_rep, check_req);
+ }
+
+ if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "failed to check exist: " << path << ", msg: " << check_rep.opStatus.message;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ } else if (!check_rep.isPathExist) {
+ return Status::NotFound("{} not exists!", path.string());
+ } else {
+ *res = true;
+ return Status::OK();
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "failed to check exist: " << path << ", msg: " << e.what();
+ return Status::RpcError(ss.str());
+ }
+}
+
+Status BrokerFileSystem::file_size(const Path& path, size_t* file_size) const {
+ CHECK_BROKER_CLIENT(_client);
+ TBrokerOpenReaderRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_path(path);
+ request.__set_startOffset(0);
+ request.__set_clientId(client_id(_broker_addr));
+ request.__set_properties(_broker_prop);
+
+ TBrokerOpenReaderResponse* response = new TBrokerOpenReaderResponse();
+ Defer del_reponse {[&] { delete response; }};
+ try {
+ Status status;
+ try {
+ (*_client)->openReader(*response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_client).reopen());
+ (*_client)->openReader(*response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker: " << _broker_addr << " failed: " << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker: " << _broker_addr
+ << " failed: " << response->opStatus.message;
+ return Status::RpcError(ss.str());
+ }
+ // TODO(cmy): The file size is no longer got from openReader() method.
+ // But leave the code here for compatibility.
+ // This will be removed later.
+ if (response->__isset.size) {
+ *file_size = response->size;
+ }
+ return Status::OK();
+}
+
+Status BrokerFileSystem::list(const Path& path, std::vector<Path>* files) {
+ CHECK_BROKER_CLIENT(_client);
+ Status status = Status::OK();
+ try {
+ // get existing files from remote path
+ TBrokerListResponse list_rep;
+ TBrokerListPathRequest list_req;
+ list_req.__set_version(TBrokerVersion::VERSION_ONE);
+ list_req.__set_path(path / "*");
+ list_req.__set_isRecursive(false);
+ list_req.__set_properties(_broker_prop);
+ list_req.__set_fileNameOnly(true); // we only need file name, not abs path
+
+ try {
+ (*_client)->listPath(list_rep, list_req);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ RETURN_IF_ERROR((*_client).reopen());
+ (*_client)->listPath(list_rep, list_req);
+ }
+
+ if (list_rep.opStatus.statusCode == TBrokerOperationStatusCode::FILE_NOT_FOUND) {
+ LOG(INFO) << "path does not exist: " << path;
+ return Status::OK();
+ } else if (list_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "failed to list files from remote path: " << path
+ << ", msg: " << list_rep.opStatus.message;
+ return Status::InternalError(ss.str());
+ }
+ LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size();
+
+ // split file name and checksum
+ for (const auto& file : list_rep.files) {
+ if (file.isDir) {
+ // this is not a file
+ continue;
+ }
+
+ const std::string& file_name = file.path;
+ size_t pos = file_name.find_last_of('.');
+ if (pos == std::string::npos || pos == file_name.size() - 1) {
+ // Not found checksum separator, ignore this file
+ continue;
+ }
+
+ FileStat stat = {std::string(file_name, 0, pos), std::string(file_name, pos + 1),
+ file.size};
+ files->emplace_back(std::string(file_name, 0, pos));
+ VLOG(2) << "split remote file: " << std::string(file_name, 0, pos)
+ << ", checksum: " << std::string(file_name, pos + 1);
+ }
+
+ LOG(INFO) << "finished to split files. valid file num: " << files->size();
+
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "failed to list files in remote path: " << path << ", msg: " << e.what();
+ return Status::RpcError(ss.str());
+ }
+ return status;
+}
+
+Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>* client) const {
+ CHECK_BROKER_CLIENT(_client);
+ *client = _client;
+ return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h
new file mode 100644
index 0000000000..5b6cce33ba
--- /dev/null
+++ b/be/src/io/fs/broker_file_system.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/remote_file_system.h"
+#include "runtime/client_cache.h"
+namespace doris {
+
+namespace io {
+class BrokerFileSystem final : public RemoteFileSystem {
+public:
+ BrokerFileSystem(const TNetworkAddress& broker_addr,
+ const std::map<std::string, std::string>& broker_prop);
+ ~BrokerFileSystem() override = default;
+
+ Status create_file(const Path& /*path*/, FileWriterPtr* /*writer*/) override {
+ return Status::NotSupported("Currently not support to create file through broker.");
+ }
+
+ Status open_file(const Path& path, FileReaderSPtr* reader) override;
+
+ Status delete_file(const Path& path) override;
+
+ Status create_directory(const Path& path) override;
+
+ // Delete all files under path.
+ Status delete_directory(const Path& path) override;
+
+ Status link_file(const Path& /*src*/, const Path& /*dest*/) override {
+ return Status::NotSupported("Not supported link file through broker.");
+ }
+
+ Status exists(const Path& path, bool* res) const override;
+
+ Status file_size(const Path& path, size_t* file_size) const override;
+
+ Status list(const Path& path, std::vector<Path>* files) override;
+
+ Status upload(const Path& /*local_path*/, const Path& /*dest_path*/) override {
+ return Status::NotSupported("Currently not support to upload file to HDFS");
+ }
+
+ Status batch_upload(const std::vector<Path>& /*local_paths*/,
+ const std::vector<Path>& /*dest_paths*/) override {
+ return Status::NotSupported("Currently not support to batch upload file to HDFS");
+ }
+
+ Status connect() override;
+
+ Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const;
+
+private:
+ const TNetworkAddress& _broker_addr;
+ const std::map<std::string, std::string>& _broker_prop;
+
+ std::shared_ptr<BrokerServiceConnection> _client;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index d97148f4c5..2dde2a64e7 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -38,6 +38,7 @@ enum class FileSystemType : uint8_t {
LOCAL,
S3,
HDFS,
+ BROKER,
};
class FileSystem {
diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp
index 1b77e64238..ef03541387 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -64,8 +64,8 @@ Status HdfsFileReader::read_at(size_t offset, Slice result, const IOContext& /*i
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
+ *bytes_read = 0;
if (UNLIKELY(bytes_req == 0)) {
- *bytes_read = 0;
return Status::OK();
}
diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp
index d219e0393a..cafa7ca34c 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -96,7 +96,7 @@ Status HdfsFileSystem::create_file(const Path& /*path*/, FileWriterPtr* /*writer
// }
// hdfsCloseFile(handle->hdfs_fs, hdfs_file);
// return Status::OK();
- return Status::NotSupported("Currently not support to upload file to HDFS");
+ return Status::NotSupported("Currently not support to create file to HDFS");
}
Status HdfsFileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
@@ -161,7 +161,12 @@ Status HdfsFileSystem::delete_directory(const Path& path) {
Status HdfsFileSystem::exists(const Path& path, bool* res) const {
CHECK_HDFS_HANDLE(_fs_handle);
- *res = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+ int is_exists = hdfsExists(_fs_handle->hdfs_fs, path.string().c_str());
+ if (is_exists == 0) {
+ *res = true;
+ } else {
+ *res = false;
+ }
return Status::OK();
}
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index d3bebc7eb0..e5dad525fb 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -171,6 +171,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(hdfs_file_reader_total, MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(broker_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM);
@@ -183,6 +184,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYS
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(hdfs_file_open_reading, MetricUnit::FILESYSTEM);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(broker_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM);
@@ -301,6 +303,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, hdfs_file_reader_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity, broker_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total);
@@ -312,6 +315,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, hdfs_file_open_reading);
+ INT_GAUGE_METRIC_REGISTER(_server_metric_entity, broker_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);
}
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index e59009a99c..a43858d055 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -155,6 +155,7 @@ public:
IntCounter* local_file_reader_total;
IntCounter* s3_file_reader_total;
IntCounter* hdfs_file_reader_total;
+ IntCounter* broker_file_reader_total;
IntCounter* local_file_writer_total;
IntCounter* s3_file_writer_total;
IntCounter* file_created_total;
@@ -166,6 +167,7 @@ public:
IntGauge* local_file_open_reading;
IntGauge* s3_file_open_reading;
IntGauge* hdfs_file_open_reading;
+ IntGauge* broker_file_open_reading;
IntGauge* local_file_open_writing;
IntGauge* s3_file_open_writing;
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.cpp b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
index 23d6fa4eab..b8f2252885 100644
--- a/be/src/vec/exec/format/file_reader/new_file_factory.cpp
+++ b/be/src/vec/exec/format/file_reader/new_file_factory.cpp
@@ -20,6 +20,7 @@
#include "io/broker_reader.h"
#include "io/broker_writer.h"
#include "io/buffered_reader.h"
+#include "io/fs/broker_file_system.h"
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
@@ -126,6 +127,12 @@ Status NewFileFactory::create_file_reader(RuntimeProfile* /*profile*/,
&file_system_ptr, file_reader));
break;
}
+ case TFileType::FILE_BROKER: {
+ RETURN_IF_ERROR(create_broker_reader(system_properties.broker_addresses[0],
+ system_properties.properties, file_description.path,
+ &file_system_ptr, file_reader));
+ break;
+ }
default:
return Status::NotSupported("unsupported file reader type: {}", std::to_string(type));
}
@@ -179,4 +186,15 @@ Status NewFileFactory::create_s3_reader(const std::map<std::string, std::string>
(*s3_file_system)->open_file(s3_uri.get_key(), reader);
return Status::OK();
}
+
+Status NewFileFactory::create_broker_reader(const TNetworkAddress& broker_addr,
+ const std::map<std::string, std::string>& prop,
+ const std::string& path,
+ io::FileSystem** broker_file_system,
+ io::FileReaderSPtr* reader) {
+ *broker_file_system = new io::BrokerFileSystem(broker_addr, prop);
+ (dynamic_cast<io::BrokerFileSystem*>(*broker_file_system))->connect();
+ (*broker_file_system)->open_file(path, reader);
+ return Status::OK();
+}
} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_file_factory.h b/be/src/vec/exec/format/file_reader/new_file_factory.h
index 82fc1fe6cf..8167618859 100644
--- a/be/src/vec/exec/format/file_reader/new_file_factory.h
+++ b/be/src/vec/exec/format/file_reader/new_file_factory.h
@@ -92,6 +92,11 @@ public:
const std::string& path, io::FileSystem** s3_file_system,
io::FileReaderSPtr* reader);
+ static Status create_broker_reader(const TNetworkAddress& broker_addr,
+ const std::map<std::string, std::string>& prop,
+ const std::string& path, io::FileSystem** hdfs_file_system,
+ io::FileReaderSPtr* reader);
+
static TFileType::type convert_storage_type(TStorageBackendType::type type) {
switch (type) {
case TStorageBackendType::LOCAL:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org