You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/07/14 04:16:40 UTC

[doris] branch master updated: [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b013f8006d [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)
b013f8006d is described below

commit b013f8006d74c8dc159c9e10a0ade6e27a16519a
Author: Siyang Tang <82...@users.noreply.github.com>
AuthorDate: Fri Jul 14 12:16:32 2023 +0800

    [enhancement](multi-table) enable mullti table routine load on pipeline engine (#21729)
---
 be/src/io/file_factory.cpp                      | 27 +++++---
 be/src/io/file_factory.h                        |  2 +-
 be/src/io/fs/multi_table_pipe.cpp               | 85 +++++++++++++++++++------
 be/src/io/fs/multi_table_pipe.h                 |  3 +
 be/src/io/fs/stream_load_pipe.cpp               |  7 ++
 be/src/io/fs/stream_load_pipe.h                 |  3 +
 be/src/pipeline/pipeline_fragment_context.cpp   |  6 +-
 be/src/runtime/runtime_state.cpp                |  6 +-
 be/src/runtime/runtime_state.h                  |  9 ++-
 be/src/vec/exec/format/csv/csv_reader.cpp       |  3 +-
 be/src/vec/exec/format/json/new_json_reader.cpp |  3 +-
 11 files changed, 114 insertions(+), 40 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index adc02a0aa2..d46d2c5b4c 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -144,7 +144,7 @@ Status FileFactory::create_file_reader(const io::FileSystemProperties& system_pr
 
 // file scan node/stream load pipe
 Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
-                                       const TUniqueId& fragment_instance_id) {
+                                       RuntimeState* runtime_state) {
     auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
     if (!stream_load_ctx) {
         return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
@@ -152,15 +152,26 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
 
     *file_reader = stream_load_ctx->pipe;
 
-    if (file_reader->get() != nullptr) {
-        auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
-        if (multi_table_pipe != nullptr) {
-            *file_reader = multi_table_pipe->getPipe(fragment_instance_id);
-            LOG(INFO) << "create pipe reader for fragment instance: " << fragment_instance_id
-                      << " pipe: " << (*file_reader).get();
-        }
+    if (file_reader->get() == nullptr) {
+        return Status::OK();
     }
 
+    auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
+    if (multi_table_pipe == nullptr || runtime_state == nullptr) {
+        return Status::OK();
+    }
+
+    TUniqueId pipe_id;
+    if (runtime_state->enable_pipeline_exec()) {
+        pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
+                                                        runtime_state->fragment_id());
+    } else {
+        pipe_id = runtime_state->fragment_instance_id();
+    }
+    *file_reader = multi_table_pipe->getPipe(pipe_id);
+    LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
+              << " pipe: " << (*file_reader).get();
+
     return Status::OK();
 }
 
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 42589df531..a311b8d58b 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -66,7 +66,7 @@ public:
 
     // Create FileReader for stream load pipe
     static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
-                                     const TUniqueId& fragment_instance_id);
+                                     RuntimeState* runtime_state);
 
     static Status create_hdfs_reader(const THdfsParams& hdfs_params, const io::FileDescription& fd,
                                      const io::FileReaderOptions& reader_options,
diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp
index 853baa4f12..12de3fa8d4 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -20,11 +20,15 @@
 #include <gen_cpp/FrontendService.h>
 #include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/HeartbeatService_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include <type_traits>
+
 #include "common/status.h"
 #include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
@@ -130,7 +134,9 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
 
 #ifndef BE_TEST
 Status MultiTablePipe::request_and_exec_plans() {
-    if (_unplanned_pipes.empty()) return Status::OK();
+    if (_unplanned_pipes.empty()) {
+        return Status::OK();
+    }
 
     // get list of table names in unplanned pipes
     std::vector<std::string> tables;
@@ -175,24 +181,52 @@ Status MultiTablePipe::request_and_exec_plans() {
         return plan_status;
     }
 
+    Status st;
+    if (_ctx->multi_table_put_result.__isset.params &&
+        !_ctx->multi_table_put_result.__isset.pipeline_params) {
+        st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
+    } else if (!_ctx->multi_table_put_result.__isset.params &&
+               _ctx->multi_table_put_result.__isset.pipeline_params) {
+        st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
+    } else {
+        return Status::Aborted("too many or too few params are set in multi_table_put_result.");
+    }
+
+    return st;
+}
+
+template <typename ExecParam>
+Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
     // put unplanned pipes into planned pipes and clear unplanned pipes
     for (auto& pipe : _unplanned_pipes) {
         _ctx->table_list.push_back(pipe.first);
         _planned_pipes.emplace(pipe.first, pipe.second);
     }
     LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
-                             _unplanned_pipes.size(), _planned_pipes.size(),
-                             _ctx->multi_table_put_result.params.size());
+                             _unplanned_pipes.size(), _planned_pipes.size(), params.size());
     _unplanned_pipes.clear();
 
-    _inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
-    for (auto& plan : _ctx->multi_table_put_result.params) {
-        // TODO: use pipeline in the future (currently is buggy for load)
-        DCHECK_EQ(plan.__isset.table_name, true);
-        DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
-        putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
-        LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
-                  << " table=" << plan.table_name;
+    _inflight_plan_cnt += params.size();
+    for (auto& plan : params) {
+        if (!plan.__isset.table_name ||
+            _planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
+            return Status::Aborted("Missing vital param: table_name");
+        }
+
+        if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
+            putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]);
+            LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id
+                      << " table=" << plan.table_name;
+        } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
+            auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
+            putPipe(pipe_id, _planned_pipes[plan.table_name]);
+            LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name;
+        } else {
+            LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
+                            "`TPipelineFragmentParams`, will crash";
+            CHECK(false);
+        }
+
         exec_env->fragment_mgr()->exec_plan_fragment(plan, [this](RuntimeState* state,
                                                                   Status* status) {
             {
@@ -243,6 +277,7 @@ Status MultiTablePipe::request_and_exec_plans() {
 
     return Status::OK();
 }
+
 #else
 Status MultiTablePipe::request_and_exec_plans() {
     // put unplanned pipes into planned pipes
@@ -254,36 +289,46 @@ Status MultiTablePipe::request_and_exec_plans() {
     _unplanned_pipes.clear();
     return Status::OK();
 }
+
+template <typename ExecParam>
+Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
+    return Status::OK();
+}
+
 #endif
 
-Status MultiTablePipe::putPipe(const TUniqueId& fragment_instance_id,
-                               std::shared_ptr<io::StreamLoadPipe> pipe) {
+Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) {
     std::lock_guard<std::mutex> l(_pipe_map_lock);
-    auto it = _pipe_map.find(fragment_instance_id);
+    auto it = _pipe_map.find(pipe_id);
     if (it != std::end(_pipe_map)) {
         return Status::InternalError("id already exist");
     }
-    _pipe_map.emplace(fragment_instance_id, pipe);
+    _pipe_map.emplace(pipe_id, pipe);
     return Status::OK();
 }
 
-std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& fragment_instance_id) {
+std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& pipe_id) {
     std::lock_guard<std::mutex> l(_pipe_map_lock);
-    auto it = _pipe_map.find(fragment_instance_id);
+    auto it = _pipe_map.find(pipe_id);
     if (it == std::end(_pipe_map)) {
         return std::shared_ptr<io::StreamLoadPipe>(nullptr);
     }
     return it->second;
 }
 
-void MultiTablePipe::removePipe(const TUniqueId& fragment_instance_id) {
+void MultiTablePipe::removePipe(const TUniqueId& pipe_id) {
     std::lock_guard<std::mutex> l(_pipe_map_lock);
-    auto it = _pipe_map.find(fragment_instance_id);
+    auto it = _pipe_map.find(pipe_id);
     if (it != std::end(_pipe_map)) {
         _pipe_map.erase(it);
-        VLOG_NOTICE << "remove stream load pipe: " << fragment_instance_id;
+        VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
     }
 }
 
+template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
+                                           std::vector<TExecPlanFragmentParams> params);
+template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
+                                           std::vector<TPipelineFragmentParams> params);
+
 } // namespace io
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index b225ea3061..fd30722256 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -68,6 +68,9 @@ private:
     // [thread-unsafe] dispatch data to corresponding KafkaConsumerPipe
     Status dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb);
 
+    template <typename ExecParam>
+    Status exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params);
+
 private:
     std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
     std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index d79cfd028d..23a4e3c604 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -250,5 +250,12 @@ void StreamLoadPipe::cancel(const std::string& reason) {
     _put_cond.notify_all();
 }
 
+TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) {
+    TUniqueId pipe_id;
+    pipe_id.lo = query_id.lo + fragment_id;
+    pipe_id.hi = query_id.hi;
+    return pipe_id;
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 848175ce9a..a184cf9e78 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -78,6 +78,9 @@ public:
 
     size_t get_queue_size() { return _buf_queue.size(); }
 
+    // used for pipeline load, which use TUniqueId(lo: query_id.lo + fragment_id, hi: query_id.hi) as pipe_id
+    static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id);
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index 7e85b19205..48316a5008 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -217,9 +217,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
             .tag("pthread_id", (uintptr_t)pthread_self());
 
     // 1. init _runtime_state
-    _runtime_state =
-            RuntimeState::create_unique(local_params, request.query_id, request.query_options,
-                                        _query_ctx->query_globals, _exec_env);
+    _runtime_state = RuntimeState::create_unique(local_params, request.query_id,
+                                                 request.fragment_id, request.query_options,
+                                                 _query_ctx->query_globals, _exec_env);
     _runtime_state->set_query_ctx(_query_ctx.get());
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 66bc4a5743..28de6c752e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -99,8 +99,9 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
 }
 
 RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
-                           const TUniqueId& query_id, const TQueryOptions& query_options,
-                           const TQueryGlobals& query_globals, ExecEnv* exec_env)
+                           const TUniqueId& query_id, int32_t fragment_id,
+                           const TQueryOptions& query_options, const TQueryGlobals& query_globals,
+                           ExecEnv* exec_env)
         : _profile("Fragment " + print_id(pipeline_params.fragment_instance_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -108,6 +109,7 @@ RuntimeState::RuntimeState(const TPipelineInstanceParams& pipeline_params,
           _data_stream_recvrs_pool(new ObjectPool()),
           _unreported_error_idx(0),
           _query_id(query_id),
+          _fragment_id(fragment_id),
           _is_cancelled(false),
           _per_fragment_instance_idx(0),
           _num_rows_load_total(0),
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index d1b90579e2..89543e492b 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -39,6 +39,7 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/factory_creator.h"
 #include "common/status.h"
+#include "gutil/integral_types.h"
 #include "util/runtime_profile.h"
 #include "util/telemetry/telemetry.h"
 
@@ -66,8 +67,8 @@ public:
                  ExecEnv* exec_env);
 
     RuntimeState(const TPipelineInstanceParams& pipeline_params, const TUniqueId& query_id,
-                 const TQueryOptions& query_options, const TQueryGlobals& query_globals,
-                 ExecEnv* exec_env);
+                 int32 fragment_id, const TQueryOptions& query_options,
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
 
     // RuntimeState for executing expr in fe-support.
     RuntimeState(const TQueryGlobals& query_globals);
@@ -115,6 +116,8 @@ public:
     const std::string& user() const { return _user; }
     const TUniqueId& query_id() const { return _query_id; }
     const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
+    // should only be called in pipeline engine
+    int32_t fragment_id() const { return _fragment_id; }
     ExecEnv* exec_env() { return _exec_env; }
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const;
 
@@ -460,6 +463,8 @@ private:
     cctz::time_zone _timezone_obj;
 
     TUniqueId _query_id;
+    // fragment id for each TPipelineFragmentParams
+    int32_t _fragment_id;
     TUniqueId _fragment_instance_id;
     TQueryOptions _query_options;
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 7390d9103a..9da29ed6dc 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -166,8 +166,7 @@ Status CsvReader::init_reader(bool is_load) {
     _file_description.start_offset = start_offset;
 
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
-                                                        _state->fragment_instance_id()));
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
     } else {
         io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
         _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index f6eabaa7cd..7d17f650c6 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -378,8 +378,7 @@ Status NewJsonReader::_open_file_reader() {
     _file_description.start_offset = start_offset;
 
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader,
-                                                        _state->fragment_instance_id()));
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state));
     } else {
         io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
         _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org