You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/07/14 04:16:40 UTC
[doris] branch master updated: [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b013f8006d [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)
b013f8006d is described below
commit b013f8006d74c8dc159c9e10a0ade6e27a16519a
Author: Siyang Tang <82...@users.noreply.github.com>
AuthorDate: Fri Jul 14 12:16:32 2023 +0800
[enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)
---
be/src/io/file_factory.cpp | 27 +++++---
be/src/io/file_factory.h | 2 +-
be/src/io/fs/multi_table_pipe.cpp | 85 +++++++++++++++++++------
be/src/io/fs/multi_table_pipe.h | 3 +
be/src/io/fs/stream_load_pipe.cpp | 7 ++
be/src/io/fs/stream_load_pipe.h | 3 +
be/src/pipeline/pipeline_fragment_context.cpp | 6 +-
be/src/runtime/runtime_state.cpp | 6 +-
be/src/runtime/runtime_state.h | 9 ++-
be/src/vec/exec/format/csv/csv_reader.cpp | 3 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 3 +-
11 files changed, 114 insertions(+), 40 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index adc02a0aa2..d46d2c5b4c 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -144,7 +144,7 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr
// file scan node/stream load pipe
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
- const TUniqueId& fragment_instance_id) {
+ RuntimeState* runtime_state) {
auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
@@ -152,15 +152,26 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
*file_reader = stream_load_ctx->pipe;
- if (file_reader->get() != nullptr) {
- auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
- if (multi_table_pipe != nullptr) {
- *file_reader = multi_table_pipe->getPipe(fragment_instance_id);
- LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id
- << " pipe: " << (*file_reader).get();
- }
+ if (file_reader->get() == nullptr) {
+ return Status::OK();
}
+ auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
+ if (multi_table_pipe == nullptr || runtime_state == nullptr) {
+ return Status::OK();
+ }
+
+ TUniqueId pipe_id;
+ if (runtime_state->enable_pipeline_exec()) {
+ pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
+ runtime_state->fragment_id());
+ } else {
+ pipe_id = runtime_state->fragment_instance_id();
+ }
+ *file_reader = multi_table_pipe->getPipe(pipe_id);
+ LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
+ << " pipe: " << (*file_reader).get();
+
return Status::OK();
}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 42589df531..a311b8d58b 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -66,7 +66,7 @@ public:
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
- const TUniqueId& fragment_instance_id);
+ RuntimeState* runtime_state);
static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
const io::FileReaderOptions& reader_options,
diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp
index 853baa4f12..12de3fa8d4 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -20,11 +20,15 @@
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <thrift/protocol/TDebugProtocol.h>
+#include <type_traits>
+
#include "common/status.h"
#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
@@ -130,7 +134,9 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
#ifndef BE_TEST
Status MultiTablePipe::request_and_exec_plans() {
- if (_unplanned_pipes.empty()) return Status::OK();
+ if (_unplanned_pipes.empty()) {
+ return Status::OK();
+ }
// get list of table names in unplanned pipes
std::vector<std::string> tables;
@@ -175,24 +181,52 @@ Status MultiTablePipe::request_and_exec_plans() {
return plan_status;
}
+ Status st;
+ if (_ctx->multi_table_put_result.__isset.params &&
+ !_ctx->multi_table_put_result.__isset.pipeline_params) {
+ st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
+ } else if (!_ctx->multi_table_put_result.__isset.params &&
+ _ctx->multi_table_put_result.__isset.pipeline_params) {
+ st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
+ } else {
+ return Status::Aborted("too many or too few params are set in multi_table_put_result.");
+ }
+
+ return st;
+}
+
+template <typename ExecParam>
+Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
// put unplanned pipes into planned pipes and clear unplanned pipes
for (auto& pipe : _unplanned_pipes) {
_ctx->table_list.push_back(pipe.first);
_planned_pipes.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
- _unplanned_pipes.size(), _planned_pipes.size(),
- _ctx->multi_table_put_result.params.size());
+ _unplanned_pipes.size(), _planned_pipes.size(), params.size());
_unplanned_pipes.clear();
- _inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
- for (auto& plan : _ctx->multi_table_put_result.params) {
- // TODO: use pipeline in the future (currently is buggy for load)
- DCHECK_EQ(plan.__isset.table_name, true);
- DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
- putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
- LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
- << " table=" << plan.table_name;
+ _inflight_plan_cnt += params.size();
+ for (auto& plan : params) {
+ if (!plan.__isset.table_name ||
+ _planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
+ return Status::Aborted("Missing vital param: table_name");
+ }
+
+ if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
+ putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
+ LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
+ << " table=" << plan.table_name;
+ } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
+ auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
+ putPipe(pipe_id, _planned_pipes[plan.table_name]);
+ LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name;
+ } else {
+ LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
+ "`TPipelineFragmentParams`, will crash";
+ CHECK(false);
+ }
+
exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
Status* status) {
{
@@ -243,6 +277,7 @@ Status MultiTablePipe::request_and_exec_plans() {
return Status::OK();
}
+
#else
Status MultiTablePipe::request_and_exec_plans() {
// put unplanned pipes into planned pipes
@@ -254,36 +289,46 @@ Status MultiTablePipe::request_and_exec_plans() {
_unplanned_pipes.clear();
return Status::OK();
}
+
+template <typename ExecParam>
+Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
+ return Status::OK();
+}
+
#endif
-Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id,
- std::shared_ptr<io::StreamLoadPipe> pipe) {
+Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
- auto it = _pipe_map.find(fragment_instance_id);
+ auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
return Status::InternalError("id already exist");
}
- _pipe_map.emplace(fragment_instance_id, pipe);
+ _pipe_map.emplace(pipe_id, pipe);
return Status::OK();
}
-std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) {
+std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
- auto it = _pipe_map.find(fragment_instance_id);
+ auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
return std::shared_ptr<io::StreamLoadPipe>(nullptr);
}
return it->second;
}
-void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) {
+void MultiTablePipe::removePipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
- auto it = _pipe_map.find(fragment_instance_id);
+ auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
_pipe_map.erase(it);
- VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id;
+ VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
}
}
+template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
+ std::vector<TExecPlanFragmentParams> params);
+template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
+ std::vector<TPipelineFragmentParams> params);
+
} // namespace io
} // namespace doris
\ No newline at end of file
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index b225ea3061..fd30722256 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -68,6 +68,9 @@ private:
// [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb);
+ template <typename ExecParam>
+ Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);
+
private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index d79cfd028d..23a4e3c604 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -250,5 +250,12 @@ void StreamLoadPipe::cancel(const std::string& reason) {
_put_cond.notify_all();
}
+TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) {
+ TUniqueId pipe_id;
+ pipe_id.lo = query_id.lo + fragment_id;
+ pipe_id.hi = query_id.hi;
+ return pipe_id;
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 848175ce9a..a184cf9e78 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -78,6 +78,9 @@ public:
size_t get_queue_size() { return _buf_queue.size(); }
+ // used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id
+ static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id);
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 7e85b19205..48316a5008 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -217,9 +217,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
.tag("pthread_id", (uintptr_t)pthread_self());
// 1. init _runtime_state
- _runtime_state =
- RuntimeState::create_unique(local_params, request.query_id, request.query_options,
- _query_ctx->query_globals, _exec_env);
+ _runtime_state = RuntimeState::create_unique(local_params, request.query_id,
+ request.fragment_id, request.query_options,
+ _query_ctx->query_globals, _exec_env);
_runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 66bc4a5743..28de6c752e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -99,8 +99,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
}
RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
- const TUniqueId& query_id, const TQueryOptions& query_options,
- const TQueryGlobals& query_globals, ExecEnv* exec_env)
+ const TUniqueId& query_id, int32_t fragment_id,
+ const TQueryOptions& query_options, const TQueryGlobals& query_globals,
+ ExecEnv* exec_env)
: _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
@@ -108,6 +109,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(query_id),
+ _fragment_id(fragment_id),
_is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index d1b90579e2..89543e492b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -39,6 +39,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/factory_creator.h"
#include "common/status.h"
+#include "gutil/integral_types.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
@@ -66,8 +67,8 @@ public:
ExecEnv* exec_env);
RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id,
- const TQueryOptions& query_options, const TQueryGlobals& query_globals,
- ExecEnv* exec_env);
+ int32 fragment_id, const TQueryOptions& query_options,
+ const TQueryGlobals& query_globals, ExecEnv* exec_env);
// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);
@@ -115,6 +116,8 @@ public:
const std::string& user() const { return _user; }
const TUniqueId& query_id() const { return _query_id; }
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
+ // should only be called in pipeline engine
+ int32_t fragment_id() const { return _fragment_id; }
ExecEnv* exec_env() { return _exec_env; }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const;
@@ -460,6 +463,8 @@ private:
cctz::time_zone _timezone_obj;
TUniqueId _query_id;
+ // fragment id for each TPipelineFragmentParams
+ int32_t _fragment_id;
TUniqueId _fragment_instance_id;
TQueryOptions _query_options;
ExecEnv* _exec_env = nullptr;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 7390d9103a..9da29ed6dc 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -166,8 +166,7 @@ Status CsvReader::init_reader(bool is_load) {
_file_description.start_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
- _state->fragment_instance_id()));
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index f6eabaa7cd..7d17f650c6 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -378,8 +378,7 @@ Status NewJsonReader::_open_file_reader() {
_file_description.start_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
- _state->fragment_instance_id()));
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
} else {
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org