You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/03 14:20:14 UTC

[incubator-doris] branch dev-1.0.1 updated (ad0216f823 -> a4252467c4)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from ad0216f823 [fix](hive) fix bug of invalid user info in external table's scan node (#9908)
     new d76e4291c2 [improvement] Optimize send fragment logic to reduce send fragment timeout error (#9720)
     new fd2375a59b [fix] (planner) slot nullable does not set correctly when plan outer join with inline view (#9927)
     new a4252467c4 [fix] fix grammar of ADMIN SHOW TABLET STORAGE FORMAT stmt (#9938)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/fragment_mgr.cpp                    | 131 +++++---
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/runtime/query_fragments_ctx.h               |  22 ++
 be/src/service/internal_service.cpp                |  54 +++-
 be/src/service/internal_service.h                  |  13 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |   9 +-
 .../org/apache/doris/analysis/InlineViewRef.java   |   8 +-
 .../org/apache/doris/catalog/TabletStatMgr.java    |   2 +-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  10 +-
 .../apache/doris/clone/ClusterLoadStatistic.java   |   2 +-
 .../java/org/apache/doris/clone/TabletChecker.java |   2 +-
 .../org/apache/doris/clone/TabletScheduler.java    |   4 +-
 .../main/java/org/apache/doris/common/Config.java  |   9 -
 .../load/routineload/RoutineLoadScheduler.java     |   4 +-
 .../org/apache/doris/master/ReportHandler.java     |   2 +-
 .../apache/doris/planner/SingleNodePlanner.java    |   9 +
 .../main/java/org/apache/doris/qe/Coordinator.java | 349 +++++++++++++--------
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |   7 +-
 .../java/org/apache/doris/qe/ResultReceiver.java   |   4 +-
 .../org/apache/doris/rpc/BackendServiceClient.java |  13 +-
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  56 ++--
 .../java/org/apache/doris/rpc/RpcException.java    |   5 +
 .../doris/load/sync/canal/CanalSyncDataTest.java   |  13 +-
 .../apache/doris/utframe/MockedBackendFactory.java |  24 +-
 gensrc/proto/internal_service.proto                |  14 +
 gensrc/thrift/PaloInternalService.thrift           |  24 +-
 .../test_outer_join_with_inline_view.out           |  13 +
 .../test_outer_join_with_inline_view.groovy        |  72 +++++
 29 files changed, 638 insertions(+), 243 deletions(-)
 create mode 100644 regression-test/data/correctness/test_outer_join_with_inline_view.out
 create mode 100644 regression-test/suites/correctness/test_outer_join_with_inline_view.groovy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/03: [improvement] Optimize send fragment logic to reduce send fragment timeout error (#9720)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d76e4291c281f93bfe2d4666d10fff85d05c04c6
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Jun 3 15:47:40 2022 +0800

    [improvement] Optimize send fragment logic to reduce send fragment timeout error (#9720)
    
    This CL mainly changes:
    1. Reducing the rpc timeout problem caused by rpc waiting for the worker thread of brpc.
        1. Merge multiple fragment instances on the same BE to send requests to reduce the number of send fragment rpcs
        2. If fragments size >= 3, use 2 phase RPC: one is to send all fragments, two is to start these fragments. So that there
             will be at most 2 RPC for each query on one BE.
    
    3. Set the timeout of send fragment rpc to the query timeout to ensure the consistency of users' expectation of query timeout period.
    
    4. Do not close the connection anymore when rpc timeout occurs.
    5. Change some log level from info to debug to simplify the fe.log content.
    
    NOTICE:
    1. Change the definition of execPlanFragment rpc, must first upgrade BE.
    3. Remove FE config `remote_fragment_exec_timeout_ms`
---
 be/src/runtime/fragment_mgr.cpp                    | 131 +++++---
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/runtime/query_fragments_ctx.h               |  22 ++
 be/src/service/internal_service.cpp                |  54 +++-
 be/src/service/internal_service.h                  |  13 +-
 .../org/apache/doris/catalog/TabletStatMgr.java    |   2 +-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  10 +-
 .../apache/doris/clone/ClusterLoadStatistic.java   |   2 +-
 .../java/org/apache/doris/clone/TabletChecker.java |   2 +-
 .../org/apache/doris/clone/TabletScheduler.java    |   4 +-
 .../main/java/org/apache/doris/common/Config.java  |   9 -
 .../load/routineload/RoutineLoadScheduler.java     |   4 +-
 .../org/apache/doris/master/ReportHandler.java     |   2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 349 +++++++++++++--------
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |   7 +-
 .../java/org/apache/doris/qe/ResultReceiver.java   |   4 +-
 .../org/apache/doris/rpc/BackendServiceClient.java |  13 +-
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  56 ++--
 .../java/org/apache/doris/rpc/RpcException.java    |   5 +
 .../doris/load/sync/canal/CanalSyncDataTest.java   |  13 +-
 .../apache/doris/utframe/MockedBackendFactory.java |  24 +-
 gensrc/proto/internal_service.proto                |  14 +
 gensrc/thrift/PaloInternalService.thrift           |  24 +-
 23 files changed, 528 insertions(+), 238 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 02bf9d5bec..93bed66116 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -139,6 +139,8 @@ public:
     void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
     std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
 
+    void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
+
 private:
     void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done);
 
@@ -170,6 +172,9 @@ private:
     std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
     // The pipe for data transfering, such as insert.
     std::shared_ptr<StreamLoadPipe> _pipe;
+
+    // If set the true, this plan fragment will be executed only after FE send execution start rpc.
+    bool _need_wait_execution_trigger = false;
 };
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -224,6 +229,11 @@ Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
 }
 
 Status FragmentExecState::execute() {
+    if (_need_wait_execution_trigger) {
+        // if _need_wait_execution_trigger is true, which means this instance
+        // is prepared but need to wait for the signal to do the rest execution.
+        _fragments_ctx->wait_for_start();
+    }
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
@@ -518,6 +528,22 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
     }
 }
 
+Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
+    std::lock_guard<std::mutex> lock(_lock);
+    TUniqueId query_id;
+    query_id.__set_hi(request->query_id().hi());
+    query_id.__set_lo(request->query_id().lo());
+    auto search = _fragments_ctx_map.find(query_id);
+    if (search == _fragments_ctx_map.end()) {
+        return Status::InternalError(
+                strings::Substitute("Failed to get query fragments context. Query may be "
+                                    "timeout or be cancelled. host: ",
+                                    BackendOptions::get_localhost()));
+    }
+    search->second->set_ready_to_execute();
+    return Status::OK();
+}
+
 void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
                            std::shared_ptr<StreamLoadPipe> pipe) {
     {
@@ -553,65 +579,62 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
     }
 
     std::shared_ptr<FragmentExecState> exec_state;
-    if (!params.__isset.is_simplified_param) {
-        // This is an old version params, all @Common components is set in TExecPlanFragmentParams.
-        exec_state.reset(new FragmentExecState(params.params.query_id,
-                                               params.params.fragment_instance_id,
-                                               params.backend_num, _exec_env, params.coord));
+    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+    if (params.is_simplified_param) {
+        // Get common components from _fragments_ctx_map
+        std::lock_guard<std::mutex> lock(_lock);
+        auto search = _fragments_ctx_map.find(params.params.query_id);
+        if (search == _fragments_ctx_map.end()) {
+            return Status::InternalError(
+                    strings::Substitute("Failed to get query fragments context. Query may be "
+                                        "timeout or be cancelled. host: ",
+                                        BackendOptions::get_localhost()));
+        }
+        fragments_ctx = search->second;
     } else {
-        std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
-        if (params.is_simplified_param) {
-            // Get common components from _fragments_ctx_map
-            std::lock_guard<std::mutex> lock(_lock);
-            auto search = _fragments_ctx_map.find(params.params.query_id);
-            if (search == _fragments_ctx_map.end()) {
-                return Status::InternalError(
-                        strings::Substitute("Failed to get query fragments context. Query may be "
-                                            "timeout or be cancelled. host: ",
-                                            BackendOptions::get_localhost()));
-            }
-            fragments_ctx = search->second;
-        } else {
-            // This may be a first fragment request of the query.
-            // Create the query fragments context.
-            fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
-            fragments_ctx->query_id = params.params.query_id;
-            RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
-                                                  &(fragments_ctx->desc_tbl)));
-            fragments_ctx->coord_addr = params.coord;
-            fragments_ctx->query_globals = params.query_globals;
-
-            if (params.__isset.resource_info) {
-                fragments_ctx->user = params.resource_info.user;
-                fragments_ctx->group = params.resource_info.group;
-                fragments_ctx->set_rsc_info = true;
-            }
+        // This may be a first fragment request of the query.
+        // Create the query fragments context.
+        fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
+        fragments_ctx->query_id = params.params.query_id;
+        RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl,
+                                              &(fragments_ctx->desc_tbl)));
+        fragments_ctx->coord_addr = params.coord;
+        fragments_ctx->query_globals = params.query_globals;
 
-            if (params.__isset.query_options) {
-                fragments_ctx->timeout_second = params.query_options.query_timeout;
-                if (params.query_options.__isset.resource_limit) {
-                    fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
-                }
+        if (params.__isset.resource_info) {
+            fragments_ctx->user = params.resource_info.user;
+            fragments_ctx->group = params.resource_info.group;
+            fragments_ctx->set_rsc_info = true;
+        }
+
+        if (params.__isset.query_options) {
+            fragments_ctx->timeout_second = params.query_options.query_timeout;
+            if (params.query_options.__isset.resource_limit) {
+                fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
             }
+        }
 
-            {
-                // Find _fragments_ctx_map again, in case some other request has already
-                // create the query fragments context.
-                std::lock_guard<std::mutex> lock(_lock);
-                auto search = _fragments_ctx_map.find(params.params.query_id);
-                if (search == _fragments_ctx_map.end()) {
-                    _fragments_ctx_map.insert(
-                            std::make_pair(fragments_ctx->query_id, fragments_ctx));
-                } else {
-                    // Already has a query fragmentscontext, use it
-                    fragments_ctx = search->second;
-                }
+        {
+            // Find _fragments_ctx_map again, in case some other request has already
+            // create the query fragments context.
+            std::lock_guard<std::mutex> lock(_lock);
+            auto search = _fragments_ctx_map.find(params.params.query_id);
+            if (search == _fragments_ctx_map.end()) {
+                _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx));
+            } else {
+                // Already has a query fragmentscontext, use it
+                fragments_ctx = search->second;
             }
         }
+    }
 
-        exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
-                                               params.params.fragment_instance_id,
-                                               params.backend_num, _exec_env, fragments_ctx));
+    exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
+                                           params.params.fragment_instance_id, params.backend_num,
+                                           _exec_env, fragments_ctx));
+    if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
+        // set need_wait_execution_trigger means this instance will not actually being executed
+        // until the execPlanFragmentStart RPC trigger to start it.
+        exec_state->set_need_wait_execution_trigger();
     }
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
@@ -663,6 +686,7 @@ void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
         std::vector<TUniqueId> to_cancel;
+        std::vector<TUniqueId> to_cancel_queries;
         DateTimeValue now = DateTimeValue::local_time();
         {
             std::lock_guard<std::mutex> lock(_lock);
@@ -673,6 +697,9 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
+                    // The execution logic of the instance needs to be notified.
+                    // The execution logic of the instance will eventually cancel the execution plan.
+                    it->second->set_ready_to_execute();
                     it = _fragments_ctx_map.erase(it);
                 } else {
                     ++it;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c208a03fdd..024cbfd23c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -66,6 +66,8 @@ public:
     // TODO(zc): report this is over
     Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb);
 
+    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
+
     Status cancel(const TUniqueId& fragment_id) {
         return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
     }
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h
index 720cda4146..73f6b415b9 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <atomic>
+#include <condition_variable>
 #include <string>
 
 #include "common/object_pool.h"
@@ -65,6 +66,21 @@ public:
         return _thread_token.get();
     }
 
+    void set_ready_to_execute() {
+        {
+            std::lock_guard<std::mutex> l(_start_lock);
+            _ready_to_execute = true;
+        }
+        _start_cond.notify_all();
+    }
+
+    void wait_for_start() {
+        std::unique_lock<std::mutex> l(_start_lock);
+        while (!_ready_to_execute.load()) {
+            _start_cond.wait(l);
+        }
+    }
+
 public:
     TUniqueId query_id;
     DescriptorTbl* desc_tbl;
@@ -94,6 +110,12 @@ private:
     // So that we can control the max thread that a query can be used to execute.
     // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
     std::unique_ptr<ThreadPoolToken> _thread_token;
+
+    std::mutex _start_lock;
+    std::condition_variable _start_cond;
+    // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState.
+    // And all fragments of this query will start execution when this is set to true.
+    std::atomic<bool> _ready_to_execute {false};
 };
 
 } // end of namespace
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 4cb6b8f7ee..c1185591e6 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -104,13 +104,33 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
-    st = _exec_plan_fragment(request->request(), compact);
+    PFragmentRequestVersion version =
+            request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
+    st = _exec_plan_fragment(request->request(), version, compact);
     if (!st.ok()) {
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
     }
     st.to_protobuf(response->mutable_status());
 }
 
+template <typename T>
+void PInternalServiceImpl<T>::exec_plan_fragment_prepare(google::protobuf::RpcController* cntl_base,
+                                                      const PExecPlanFragmentRequest* request,
+                                                      PExecPlanFragmentResult* response,
+                                                      google::protobuf::Closure* done) {
+    exec_plan_fragment(cntl_base, request, response, done);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcController* controller,
+                                                    const PExecPlanFragmentStartRequest* request,
+                                                    PExecPlanFragmentResult* result,
+                                                    google::protobuf::Closure* done) {
+    brpc::ClosureGuard closure_guard(done);
+    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+    st.to_protobuf(result->mutable_status());
+}
+
 template <typename T>
 void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base,
                                                       const PTabletWriterAddBatchRequest* request,
@@ -162,14 +182,32 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
 }
 
 template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request, bool compact) {
-    TExecPlanFragmentParams t_request;
-    {
-        const uint8_t* buf = (const uint8_t*)ser_request.data();
-        uint32_t len = ser_request.size();
-        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request,
+                                                 PFragmentRequestVersion version, bool compact) {
+    if (version == PFragmentRequestVersion::VERSION_1) {
+        // VERSION_1 should be removed in v1.2
+        TExecPlanFragmentParams t_request;
+        {
+            const uint8_t* buf = (const uint8_t*)ser_request.data();
+            uint32_t len = ser_request.size();
+            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
+        }
+        return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+    } else if (version == PFragmentRequestVersion::VERSION_2) {
+        TExecPlanFragmentParamsList t_request;
+        {
+            const uint8_t* buf = (const uint8_t*)ser_request.data();
+            uint32_t len = ser_request.size();
+            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
+        }
+
+        for (const TExecPlanFragmentParams& params : t_request.paramsList) {
+            RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+        }
+        return Status::OK();
+    } else {
+        return Status::InternalError("invalid version");
     }
-    return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
 }
 
 template <typename T>
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index c4073bf86e..550773b950 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -46,6 +46,16 @@ public:
                             PExecPlanFragmentResult* result,
                             google::protobuf::Closure* done) override;
 
+    void exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
+                                    const PExecPlanFragmentRequest* request,
+                                    PExecPlanFragmentResult* result,
+                                    google::protobuf::Closure* done) override;
+
+    void exec_plan_fragment_start(google::protobuf::RpcController* controller,
+                                  const PExecPlanFragmentStartRequest* request,
+                                  PExecPlanFragmentResult* result,
+                                  google::protobuf::Closure* done) override;
+
     void cancel_plan_fragment(google::protobuf::RpcController* controller,
                               const PCancelPlanFragmentRequest* request,
                               PCancelPlanFragmentResult* result,
@@ -117,7 +127,8 @@ public:
                     PHandShakeResponse* response, google::protobuf::Closure* done) override;
 
 private:
-    Status _exec_plan_fragment(const std::string& s_request, bool compact);
+    Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version,
+                               bool compact);
 
     Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 57a5c1625b..03eac7ca26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -72,7 +72,7 @@ public class TabletStatMgr extends MasterDaemon {
                 }
             }
         });
-        LOG.info("finished to get tablet stat of all backends. cost: {} ms",
+        LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
                 (System.currentTimeMillis() - start));
 
         // after update replica in all backends, update index row num
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index fb99b881da..9ea4bbc9fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -82,7 +82,7 @@ public class BeLoadRebalancer extends Rebalancer {
         clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
 
         if (lowBEs.isEmpty() && highBEs.isEmpty()) {
-            LOG.info("cluster is balance: {} with medium: {}. skip", clusterName, medium);
+            LOG.debug("cluster is balance: {} with medium: {}. skip", clusterName, medium);
             return alternativeTablets;
         }
 
@@ -186,9 +186,11 @@ public class BeLoadRebalancer extends Rebalancer {
             }
         } // end for high backends
 
-        LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
-                clusterName, medium, alternativeTablets.size(),
-                alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        if (!alternativeTablets.isEmpty()) {
+            LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
+                    clusterName, medium, alternativeTablets.size(),
+                    alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        }
         return alternativeTablets;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
index 338336e41c..6afddb206f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
@@ -183,7 +183,7 @@ public class ClusterLoadStatistic {
             }
         }
 
-        LOG.info("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}",
+        LOG.debug("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}",
                 medium, avgLoadScore, lowCounter, midCounter, highCounter);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 0025a27dc5..9bcd477a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -209,7 +209,7 @@ public class TabletChecker extends MasterDaemon {
         removePriosIfNecessary();
 
         stat.counterTabletCheckRound.incrementAndGet();
-        LOG.info(stat.incrementalBrief());
+        LOG.debug(stat.incrementalBrief());
     }
 
     private static class CheckerCounter {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 8231269816..97c47b3418 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -322,7 +322,7 @@ public class TabletScheduler extends MasterDaemon {
                         infoService, invertedIndex);
                 clusterLoadStatistic.init();
                 newStatisticMap.put(clusterName, tag, clusterLoadStatistic);
-                LOG.info("update cluster {} load statistic:\n{}", clusterName, clusterLoadStatistic.getBrief());
+                LOG.debug("update cluster {} load statistic:\n{}", clusterName, clusterLoadStatistic.getBrief());
             }
         }
 
@@ -352,7 +352,7 @@ public class TabletScheduler extends MasterDaemon {
             pendingTablets.add(tabletCtx);
         }
 
-        LOG.info("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
+        LOG.debug("adjust priority for all tablets. changed: {}, total: {}", changedNum, size);
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 80b144cc04..e289b91b99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1031,15 +1031,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean enable_local_replica_selection_fallback = false;
 
-
-    /**
-     * The timeout of executing async remote fragment.
-     * In normal case, the async remote fragment will be executed in a short time. If system are under high load
-     * condition,try to set this timeout longer.
-     */
-    @ConfField(mutable = true)
-    public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec
-
     /**
      * The number of query retries.
      * A query may retry if we encounter RPC exception and no result has been sent to user.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index df15bdc5d6..d2349b1765 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -71,7 +71,9 @@ public class RoutineLoadScheduler extends MasterDaemon {
             LOG.warn("failed to get need schedule routine jobs", e);
         }
 
-        LOG.info("there are {} job need schedule", routineLoadJobList.size());
+        if (!routineLoadJobList.isEmpty()) {
+            LOG.info("there are {} job need schedule", routineLoadJobList.size());
+        }
         for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
             RoutineLoadJob.JobState errorJobState = null;
             UserException userException = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d875fecbe6..01192f895a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -338,7 +338,7 @@ public class ReportHandler extends Daemon {
     }
 
     private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
-        LOG.info("begin to handle task report from backend {}", backendId);
+        LOG.debug("begin to handle task report from backend {}", backendId);
         long start = System.currentTimeMillis();
 
         if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7c050bdc2c..334f203609 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -57,7 +57,10 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.Types;
+import org.apache.doris.proto.Types.PUniqueId;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -70,6 +73,7 @@ import org.apache.doris.thrift.TDescriptorTable;
 import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
@@ -103,6 +107,7 @@ import org.apache.commons.collections.map.HashedMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.jetbrains.annotations.NotNull;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -212,6 +217,10 @@ public class Coordinator {
     // parallel execute
     private final TUniqueId nextInstanceId;
 
+    // a timestamp represent the absolute timeout
+    // eg, System.currentTimeMillis() + query_timeout * 1000
+    private long timeoutDeadline;
+
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -223,6 +232,7 @@ public class Coordinator {
     // Runtime filter ID to the builder instance number
     public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
 
+
     // Used for query/insert
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
         this.isBlockQuery = planner.isBlockQuery();
@@ -490,15 +500,14 @@ public class Coordinator {
         PlanFragmentId topId = fragments.get(0).getFragmentId();
         FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
         DataSink topDataSink = topParams.fragment.getSink();
+        this.timeoutDeadline = System.currentTimeMillis() + queryOptions.query_timeout * 1000;
         if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
             TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
-            receiver = new ResultReceiver(
-                    topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr),
-                    toBrpcHost(execBeAddr),
-                    queryOptions.query_timeout * 1000);
+            receiver = new ResultReceiver(topParams.instanceExecParams.get(0).instanceId,
+                    addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+                LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId),
+                        topParams.instanceExecParams.get(0).host);
             }
 
             if (topDataSink instanceof ResultFileSink
@@ -531,6 +540,31 @@ public class Coordinator {
         sendFragment();
     }
 
+    /**
+     * The logic for sending query plan fragments is as follows:
+     * First, plan fragments are dependent. According to the order in "fragments" list,
+     * it must be ensured that on the BE side, the next fragment instance can be executed
+     * only after the previous fragment instance is ready,
+     * <p>
+     * In the previous logic, we will send fragment instances in sequence through RPC,
+     * and will wait for the RPC of the previous fragment instance to return successfully
+     * before sending the next one. But for some complex queries, this may lead to too many RPCs.
+     * <p>
+     * The optimized logic is as follows:
+     * 1. If the number of fragment instance is <= 2, the original logic is still used
+     * to complete the sending of fragments through at most 2 RPCs.
+     * 2. If the number of fragment instance is >= 3, first group all fragments by BE,
+     * and send all fragment instances to the corresponding BE node through the FIRST rpc,
+     * but these fragment instances will only perform the preparation phase but will not be actually executed.
+     * After that, the execution logic of all fragment instances is started through the SECOND RPC.
+     * <p>
+     * After optimization, a query on a BE node will only send two RPCs at most.
+     * Thereby reducing the "send fragment timeout" error caused by too many RPCs and BE unable to process in time.
+     *
+     * @throws TException
+     * @throws RpcException
+     * @throws UserException
+     */
     private void sendFragment() throws TException, RpcException, UserException {
         lock();
         try {
@@ -540,23 +574,23 @@ public class Coordinator {
                     hostCounter.add(fi.host);
                 }
             }
-            // Execute all instances from up to bottom
-            // NOTICE: We must ensure that these fragments are executed sequentially,
-            // otherwise the data dependency between the fragments will be destroyed.
+
             int backendIdx = 0;
             int profileFragmentId = 0;
             long memoryLimit = queryOptions.getMemLimit();
+            Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
+            // If #fragments >=3, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start,
+            // else use exec_plan_fragments directly.
+            boolean twoPhaseExecution = fragments.size() >= 3;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());
 
-                // set up exec states
+                // 1. set up exec states
                 int instanceNum = params.instanceExecParams.size();
                 Preconditions.checkState(instanceNum > 0);
                 List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx);
-                List<Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>>> futures =
-                        Lists.newArrayList();
 
-                // update memory limit for colocate join
+                // 2. update memory limit for colocate join
                 if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
                     int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
                     long newMemory = memoryLimit / rate;
@@ -574,15 +608,16 @@ public class Coordinator {
                     needCheckBackendState = true;
                 }
 
+                // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE.
                 int instanceId = 0;
                 for (TExecPlanFragmentParams tParam : tParams) {
-                    BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++,
-                            profileFragmentId, tParam, this.addressToBackendID);
-                    execState.unsetFields();
+                    BackendExecState execState =
+                            new BackendExecState(fragment.getFragmentId(), instanceId++, profileFragmentId, tParam, this.addressToBackendID);
                     // Each tParam will set the total number of Fragments that need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have been executed based on this information.
                     tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
                     tParam.setBackendId(execState.backend.getId());
+                    tParam.setNeedWaitExecutionTrigger(twoPhaseExecution);
 
                     backendExecStates.add(execState);
                     if (needCheckBackendState) {
@@ -592,69 +627,33 @@ public class Coordinator {
                                     fragment.getFragmentId().asInt(), jobId);
                         }
                     }
-                    futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));
 
-                    backendIdx++;
-                }
-
-                for (Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
-                    TStatusCode code;
-                    String errMsg = null;
-                    Exception exception = null;
-                    try {
-                        InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
-                                TimeUnit.MILLISECONDS);
-                        code = TStatusCode.findByValue(result.getStatus().getStatusCode());
-                        if (!result.getStatus().getErrorMsgsList().isEmpty()) {
-                            errMsg = result.getStatus().getErrorMsgsList().get(0);
-                        }
-                    } catch (ExecutionException e) {
-                        LOG.warn("catch a execute exception", e);
-                        exception = e;
-                        code = TStatusCode.THRIFT_RPC_ERROR;
-                        BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
-                    } catch (InterruptedException e) {
-                        LOG.warn("catch a interrupt exception", e);
-                        exception = e;
-                        code = TStatusCode.INTERNAL_ERROR;
-                    } catch (TimeoutException e) {
-                        LOG.warn("catch a timeout exception", e);
-                        exception = e;
-                        code = TStatusCode.TIMEOUT;
-                        BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
+                    BackendExecStates states = beToExecStates.get(execState.backend.getId());
+                    if (states == null) {
+                        states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress,
+                                twoPhaseExecution);
+                        beToExecStates.putIfAbsent(execState.backend.getId(), states);
                     }
+                    states.addState(execState);
+                }
+                profileFragmentId += 1;
+            } // end for fragments
 
-                    if (code != TStatusCode.OK) {
-                        if (exception != null) {
-                            errMsg = exception.getMessage();
-                        }
-
-                        if (errMsg == null) {
-                            errMsg = "exec rpc error. backend id: " + pair.first.backend.getId();
-                        }
-                        queryStatus.setStatus(errMsg);
-                        LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}",
-                                errMsg, code, fragment.getFragmentId(),
-                                pair.first.address.hostname, pair.first.address.port);
-                        cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
-                        switch (code) {
-                            case TIMEOUT:
-                                throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
-                                        + pair.first.backend.getId() + " fragment: " +
-                                        DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id));
-                            case THRIFT_RPC_ERROR:
-                                SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
-                                throw new RpcException(pair.first.backend.getHost(), "rpc failed");
-                            default:
-                                throw new UserException(errMsg);
-                        }
-                    }
+            // 4. send and wait fragments rpc
+            List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList();
+            for (BackendExecStates states : beToExecStates.values()) {
+                states.unsetFields();
+                futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));
+            }
+            waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments");
 
-                    // succeed to send the plan fragment, update the "alreadySentBackendIds"
-                    alreadySentBackendIds.add(pair.first.backend.getId());
+            if (twoPhaseExecution) {
+                // 5. send and wait execution start rpc
+                futures.clear();
+                for (BackendExecStates states : beToExecStates.values()) {
+                    futures.add(Pair.create(states, states.execPlanFragmentStartAsync()));
                 }
-
-                profileFragmentId += 1;
+                waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start");
             }
 
             attachInstanceProfileToFragmentProfile();
@@ -663,6 +662,63 @@ public class Coordinator {
         }
     }
 
+    private void waitRpc(List<Pair<BackendExecStates, Future<PExecPlanFragmentResult>>> futures, long timeoutMs,
+            String operation) throws RpcException, UserException {
+        if (timeoutMs <= 0) {
+            throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + (
+                    (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout));
+        }
+        
+        for (Pair<BackendExecStates, Future<PExecPlanFragmentResult>> pair : futures) {
+            TStatusCode code;
+            String errMsg = null;
+            Exception exception = null;
+            try {
+                PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS);
+                code = TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+                        errMsg = result.getStatus().getErrorMsgsList().get(0);
+                    } else {
+                        errMsg = operation + " failed. backend id: " + pair.first.beId;
+                    }
+                }
+            } catch (ExecutionException e) {
+                exception = e;
+                code = TStatusCode.THRIFT_RPC_ERROR;
+                BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr);
+            } catch (InterruptedException e) {
+                exception = e;
+                code = TStatusCode.INTERNAL_ERROR;
+            } catch (TimeoutException e) {
+                exception = e;
+                errMsg = "timeout when waiting for " + operation + " RPC. Elapse(sec): "
+                        + ((System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout);
+                code = TStatusCode.TIMEOUT;
+            }
+
+            if (code != TStatusCode.OK) {
+                if (exception != null && errMsg == null) {
+                    errMsg = operation + " failed. " + exception.getMessage();
+                }
+                queryStatus.setStatus(errMsg);
+                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                switch (code) {
+                    case TIMEOUT:
+                        throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
+                    case THRIFT_RPC_ERROR:
+                        SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
+                        throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
+                    default:
+                        throw new UserException(errMsg, exception);
+                }
+            }
+
+            // succeed to send the plan fragment, update the "alreadySentBackendIds"
+            alreadySentBackendIds.add(pair.first.beId);
+        }
+    }
+
     public List<String> getExportFiles() {
         return exportFiles;
     }
@@ -1896,7 +1952,6 @@ public class Coordinator {
     public class BackendExecState {
         TExecPlanFragmentParams rpcParams;
         PlanFragmentId fragmentId;
-        int instanceId;
         boolean initiated;
         volatile boolean done;
         boolean hasCanceled;
@@ -1906,18 +1961,20 @@ public class Coordinator {
         TNetworkAddress address;
         Backend backend;
         long lastMissingHeartbeatTime = -1;
+        TUniqueId instanceId;
 
         public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId,
                                 TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID) {
             this.profileFragmentId = profileFragmentId;
             this.fragmentId = fragmentId;
-            this.instanceId = instanceId;
             this.rpcParams = rpcParams;
             this.initiated = false;
             this.done = false;
             FInstanceExecParam fi = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
+            this.instanceId = fi.instanceId;
             this.address = fi.host;
             this.backend = idToBackend.get(addressToBackendID.get(address));
+            this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
 
             String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")";
             this.profile = new RuntimeProfile(name);
@@ -2016,62 +2073,110 @@ public class Coordinator {
             return true;
         }
 
-        public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
-            try {
-                brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
-            } catch (Exception e) {
-                throw new TException(e.getMessage());
+        public FragmentInstanceInfo buildFragmentInstanceInfo() {
+            return new QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId())
+                    .fragmentId(String.valueOf(fragmentId)).address(this.address).build();
+        }
+
+        private TUniqueId fragmentInstanceId() {
+            return this.rpcParams.params.getFragmentInstanceId();
+        }
+    }
+
+    /**
+     * A set of BackendExecState for same Backend
+     */
+    public class BackendExecStates {
+        long beId;
+        TNetworkAddress brpcAddr;
+        List<BackendExecState> states = Lists.newArrayList();
+        boolean twoPhaseExecution = false;
+
+        public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution) {
+            this.beId = beId;
+            this.brpcAddr = brpcAddr;
+            this.twoPhaseExecution = twoPhaseExecution;
+        }
+
+        public void addState(BackendExecState state) {
+            this.states.add(state);
+        }
+
+        /**
+         * The BackendExecState in states are all send to the same BE.
+         * So only the first BackendExecState need to carry some common fields, such as DescriptorTbl,
+         * the other BackendExecState does not need those fields. Unset them to reduce size.
+         */
+        public void unsetFields() {
+            boolean first = true;
+            for (BackendExecState state : states) {
+                if (first) {
+                    first = false;
+                    continue;
+                }
+                state.unsetFields();
             }
-            this.initiated = true;
+        }
+
+        public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync() throws TException {
             try {
-                return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
+                TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
+                for (BackendExecState state : states) {
+                    paramsList.addToParamsList(state.rpcParams);
+                }
+                return BackendServiceProxy.getInstance()
+                        .execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution);
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with error code,
                 // so that the following logic will cancel the fragment.
-                return new Future<InternalService.PExecPlanFragmentResult>() {
-                    @Override
-                    public boolean cancel(boolean mayInterruptIfRunning) {
-                        return false;
-                    }
+                return futureWithException(e);
+            }
+        }
 
-                    @Override
-                    public boolean isCancelled() {
-                        return false;
-                    }
+        public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync() throws TException {
+            try {
+                PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder();
+                PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
+                builder.setQueryId(qid);
+                return BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, builder.build());
+            } catch (RpcException e) {
+                // DO NOT throw exception here, return a complete future with error code,
+                // so that the following logic will cancel the fragment.
+                return futureWithException(e);
+            }
+        }
 
-                    @Override
-                    public boolean isDone() {
-                        return true;
-                    }
+        @NotNull
+        private Future<PExecPlanFragmentResult> futureWithException(RpcException e) {
+            return new Future<PExecPlanFragmentResult>() {
+                @Override
+                public boolean cancel(boolean mayInterruptIfRunning) {
+                    return false;
+                }
 
-                    @Override
-                    public InternalService.PExecPlanFragmentResult get() {
-                        InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult
-                                .newBuilder()
-                                .setStatus(Types.PStatus.newBuilder()
-                                        .addErrorMsgs(e.getMessage())
-                                        .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue())
-                                        .build())
-                                .build();
-                        return result;
-                    }
+                @Override
+                public boolean isCancelled() {
+                    return false;
+                }
 
-                    @Override
-                    public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
-                        return get();
-                    }
-                };
-            }
-        }
+                @Override
+                public boolean isDone() {
+                    return true;
+                }
 
-        public FragmentInstanceInfo buildFragmentInstanceInfo() {
-            return new QueryStatisticsItem.FragmentInstanceInfo.Builder()
-                    .instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address)
-                    .build();
-        }
+                @Override
+                public PExecPlanFragmentResult get() {
+                    PExecPlanFragmentResult result = PExecPlanFragmentResult.newBuilder().setStatus(
+                            Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
+                                    .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
+                    return result;
+                }
 
-        private TUniqueId fragmentInstanceId() {
-            return this.rpcParams.params.getFragmentInstanceId();
+                @Override
+                public PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
+                    return get();
+                }
+            };
         }
     }
 
@@ -2288,5 +2393,3 @@ public class Coordinator {
     }
 }
 
-
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index ea0cb6cfda..41689a9b6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -85,8 +86,10 @@ public class InsertStreamTxnExecutor {
         txnEntry.setBackend(backend);
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
         try {
-            Future<InternalService.PExecPlanFragmentResult> future = BackendServiceProxy.getInstance().execPlanFragmentAsync(
-                    address, tRequest);
+            TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
+            paramsList.addToParamsList(tRequest);
+            Future<InternalService.PExecPlanFragmentResult> future =
+                    BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false);
             InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
             TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 7978ebc2be..d713ed8a93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -48,11 +48,11 @@ public class ResultReceiver {
     private Long backendId;
     private Thread currentThread;
 
-    public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, int timeoutMs) {
+    public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs) {
         this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
-        this.timeoutTs = System.currentTimeMillis() + timeoutMs;
+        this.timeoutTs = timeoutTs;
     }
 
     public RowBatch getNext(Status status) throws TException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 73309047d7..82bd1508bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -44,8 +44,7 @@ public class BackendServiceClient {
         this.address = address;
         channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
                 .flowControlWindow(Config.grpc_max_message_size_bytes)
-                .maxInboundMessageSize(Config.grpc_max_message_size_bytes)
-                .enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
+                .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
                 .usePlaintext().build();
         stub = PBackendServiceGrpc.newFutureStub(channel);
         blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
@@ -56,6 +55,16 @@ public class BackendServiceClient {
         return stub.execPlanFragment(request);
     }
 
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentPrepareAsync(
+            InternalService.PExecPlanFragmentRequest request) {
+        return stub.execPlanFragmentPrepare(request);
+    }
+
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(
+            InternalService.PExecPlanFragmentStartRequest request) {
+        return stub.execPlanFragmentStart(request);
+    }
+
     public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
             InternalService.PCancelPlanFragmentRequest request) {
         return stub.cancelPlanFragment(request);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index cbf43cac1e..95148aa8e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -19,8 +19,9 @@ package org.apache.doris.rpc;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.Types;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFoldConstantParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
@@ -93,43 +94,58 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
-            TNetworkAddress address, TExecPlanFragmentParams tRequest)
-            throws TException, RpcException {
-        InternalService.PExecPlanFragmentRequest.Builder builder = InternalService.PExecPlanFragmentRequest.newBuilder();
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address,
+            TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) throws TException, RpcException {
+        InternalService.PExecPlanFragmentRequest.Builder builder =
+                InternalService.PExecPlanFragmentRequest.newBuilder();
         if (Config.use_compact_thrift_rpc) {
-            builder.setRequest(ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(tRequest)));
+            builder.setRequest(
+                    ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList)));
             builder.setCompact(true);
         } else {
-            builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build();
+            builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(paramsList))).build();
             builder.setCompact(false);
         }
+        // VERSION 2 means we send TExecPlanFragmentParamsList, not single TExecPlanFragmentParams
+        builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2);
 
         final InternalService.PExecPlanFragmentRequest pRequest = builder.build();
         try {
             final BackendServiceClient client = getProxy(address);
-            return client.execPlanFragmentAsync(pRequest);
+            if (twoPhaseExecution) {
+                return client.execPlanFragmentPrepareAsync(pRequest);
+            } else {
+                return client.execPlanFragmentAsync(pRequest);
+            }
         } catch (Throwable e) {
-            LOG.warn("Execute plan fragment catch a exception, address={}:{}",
-                    address.getHostname(), address.getPort(), e);
+            LOG.warn("Execute plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(),
+                    e);
             throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
-    public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(
-            TNetworkAddress address, TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason)
-            throws RpcException {
-        final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
-                .newBuilder()
-                .setFinstId(
-                        Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
-                .setCancelReason(cancelReason).build();
+    public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(TNetworkAddress address,
+            PExecPlanFragmentStartRequest request) throws TException, RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.execPlanFragmentStartAsync(request);
+        } catch (Exception e) {
+            throw new RpcException(address.hostname, e.getMessage(), e);
+        }
+    }
+
+    public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(TNetworkAddress address,
+            TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
+        final InternalService.PCancelPlanFragmentRequest pRequest =
+                InternalService.PCancelPlanFragmentRequest.newBuilder()
+                        .setFinstId(Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
+                        .setCancelReason(cancelReason).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.cancelPlanFragmentAsync(pRequest);
         } catch (Throwable e) {
-            LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
-                    address.getHostname(), address.getPort(), e);
+            LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(),
+                    e);
             throw new RpcException(address.hostname, e.getMessage());
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
index d65802633b..45c765f01b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
@@ -28,6 +28,11 @@ public class RpcException extends Exception {
         this.host = host;
     }
 
+    public RpcException(String host, String message, Exception e) {
+        super(message, e);
+        this.host = host;
+    }
+
     @Override
     public String getMessage() {
         if (Strings.isNullOrEmpty(host)) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 3b1b4eddfc..4a7f4cf3c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
 import org.apache.doris.thrift.TStorageMedium;
@@ -253,7 +254,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
@@ -261,7 +263,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                backendServiceProxy.sendData((TNetworkAddress) any, (Types.PUniqueId) any,
+                        (List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
 
@@ -324,7 +327,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
@@ -390,7 +394,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index bc3e7928fd..902de437b8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -337,7 +337,8 @@ public class MockedBackendFactory {
         }
 
         @Override
-        public void execPlanFragment(InternalService.PExecPlanFragmentRequest request, StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
+        public void execPlanFragment(InternalService.PExecPlanFragmentRequest request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
             System.out.println("get exec_plan_fragment request");
             responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
                     .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
@@ -345,7 +346,26 @@ public class MockedBackendFactory {
         }
 
         @Override
-        public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, StreamObserver<InternalService.PCancelPlanFragmentResult> responseObserver) {
+        public void execPlanFragmentPrepare(InternalService.PExecPlanFragmentRequest request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
+            System.out.println("get exec_plan_fragment_prepare request");
+            responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+                    .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void execPlanFragmentStart(InternalService.PExecPlanFragmentStartRequest request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> responseObserver) {
+            System.out.println("get exec_plan_fragment_start request");
+            responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+                    .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request,
+                StreamObserver<InternalService.PCancelPlanFragmentResult> responseObserver) {
             System.out.println("get cancel_plan_fragment request");
             responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder()
                     .setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 4d73bcb701..45662c10cd 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -128,11 +128,21 @@ message PTabletWriterCancelRequest {
 message PTabletWriterCancelResult {
 };
 
+enum PFragmentRequestVersion {
+    VERSION_1 = 1;  // only one TExecPlanFragmentParams in request
+    VERSION_2 = 2;  // multi TExecPlanFragmentParams in request
+};
+
 message PExecPlanFragmentRequest {
     optional bytes request = 1;
     optional bool compact = 2;
+    optional PFragmentRequestVersion version = 3 [default = VERSION_2];
 };
 
+message PExecPlanFragmentStartRequest {
+    optional PUniqueId query_id = 1;
+}
+
 message PExecPlanFragmentResult {
     required PStatus status = 1;
 };
@@ -423,7 +433,11 @@ message PResetRPCChannelResponse {
 
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
+    // If #fragments of a query is < 3, use exec_plan_fragment directly.
+    // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
+    rpc exec_plan_fragment_prepare(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
+    rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns (PExecPlanFragmentResult);
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 19ae35a64e..aa1576b95a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -277,8 +277,18 @@ struct TTxnParams {
   10: optional double max_filter_ratio
 }
 
-// ExecPlanFragment
+// Definition of global dict, global dict is used to accelerate query performance of low cardinality data
+struct TColumnDict {
+  1: optional Types.TPrimitiveType type
+  2: list<string> str_dict  // map one string to a integer, using offset as id
+}
+
+struct TGlobalDict {
+  1: optional map<i32, TColumnDict> dicts,  // map dict_id to column dict
+  2: optional map<i32, i32> slot_dicts // map from slot id to column dict id, because 2 or more column may share the dict
+}
 
+// ExecPlanFragment
 struct TExecPlanFragmentParams {
   1: required PaloInternalServiceVersion protocol_version
 
@@ -330,9 +340,19 @@ struct TExecPlanFragmentParams {
 
   // If true, all @Common components is unset and should be got from BE's cache
   // If this field is unset or it set to false, all @Common components is set.
-  16: optional bool is_simplified_param
+  16: optional bool is_simplified_param = false;
   17: optional TTxnParams txn_conf
   18: optional i64 backend_id
+  19: optional TGlobalDict global_dict  // scan node could use the global dict to encode the string value to an integer
+
+  // If it is true, after this fragment is prepared on the BE side,
+  // it will wait for the FE to send the "start execution" command before it is actually executed.
+  // Otherwise, the fragment will start executing directly on the BE side.
+  20: optional bool need_wait_execution_trigger = false;
+}
+
+struct TExecPlanFragmentParamsList {
+    1: optional list<TExecPlanFragmentParams> paramsList;
 }
 
 struct TExecPlanFragmentResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/03: [fix] fix grammar of ADMIN SHOW TABLET STORAGE FORMAT stmt (#9938)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a4252467c4c32801dbd94b5f2e19b96776f4f650
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Fri Jun 3 17:49:34 2022 +0800

    [fix] fix grammar of ADMIN SHOW TABLET STORAGE FORMAT stmt (#9938)
---
 fe/fe-core/src/main/cup/sql_parser.cup | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 5098d332fa..ce45f1cf56 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -5285,11 +5285,11 @@ admin_stmt ::=
     {:
         RESULT = new AdminDiagnoseTabletStmt(tabletId);
     :}
-    | KW_ADMIN KW_TABLET KW_STORAGE KW_FORMAT
+    | KW_ADMIN KW_SHOW KW_TABLET KW_STORAGE KW_FORMAT
     {:
         RESULT = new AdminShowTabletStorageFormatStmt(false);
     :}
-    | KW_ADMIN KW_TABLET KW_STORAGE KW_FORMAT KW_VERBOSE
+    | KW_ADMIN KW_SHOW KW_TABLET KW_STORAGE KW_FORMAT KW_VERBOSE
     {:
         RESULT = new AdminShowTabletStorageFormatStmt(true);
     :}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/03: [fix] (planner) slot nullable does not set correctly when plan outer join with inline view (#9927)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit fd2375a59bea116098604912a6605423559b0966
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Fri Jun 3 17:50:10 2022 +0800

    [fix] (planner) slot nullable does not set correctly when plan outer join with inline view (#9927)
    
    - set inline view's slot descriptor to nullable in register column ref
    - propagate slot nullable when generate inline view's query node in SingleNodePlanner
---
 .../java/org/apache/doris/analysis/Analyzer.java   |  9 ++-
 .../org/apache/doris/analysis/InlineViewRef.java   |  8 ++-
 .../apache/doris/planner/SingleNodePlanner.java    |  9 +++
 .../test_outer_join_with_inline_view.out           | 13 ++++
 .../test_outer_join_with_inline_view.groovy        | 72 ++++++++++++++++++++++
 5 files changed, 108 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index b7da3f004b..c3e4e66984 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -737,12 +737,19 @@ public class Analyzer {
         String key = d.getAlias() + "." + col.getName();
         SlotDescriptor result = slotRefMap.get(key);
         if (result != null) {
+            // this is a trick to set slot as nullable when slot is on inline view
+            // When analyze InlineViewRef, we first generate sMap and baseTblSmap and then analyze join.
+            // We have already registered column ref at that time, but we did not know
+            // whether inline view is outer joined. So we have to check it and set slot as nullable here.
+            if (isOuterJoined(d.getId())) {
+                result.setIsNullable(true);
+            }
             result.setMultiRef(true);
             return result;
         }
         result = globalState.descTbl.addSlotDescriptor(d);
         result.setColumn(col);
-        if (col.isAllowNull() || globalState.outerJoinedTupleIds.containsKey(d.getId())) {
+        if (col.isAllowNull() || isOuterJoined(d.getId())) {
             result.setIsNullable(true);
         } else {
             result.setIsNullable(false);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
index a8ae56b61c..7bf43b78b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java
@@ -199,7 +199,7 @@ public class InlineViewRef extends TableRef {
             materializedTupleIds.add(desc.getId());
         }
 
-        // create smap_ and baseTblSmap_ and register auxiliary eq predicates between our
+        // create sMap and baseTblSmap and register auxiliary eq predicates between our
         // tuple descriptor's slots and our *unresolved* select list exprs;
         // we create these auxiliary predicates so that the analyzer can compute the value
         // transfer graph through this inline view correctly (ie, predicates can get
@@ -226,10 +226,14 @@ public class InlineViewRef extends TableRef {
             LOG.debug("inline view " + getUniqueAlias() + " baseTblSmap: " + baseTblSmap.debugString());
         }
 
-        // anlayzeLateralViewRefs
+        // analyzeLateralViewRefs
         analyzeLateralViewRef(analyzer);
 
         // Now do the remaining join analysis
+        // In general, we should do analyze join before do RegisterColumnRef. However, We cannot move analyze join
+        // before generate sMap and baseTblSmap, because generate sMap and baseTblSmap will register all column refs
+        // in the inline view. If inline view is on right side of left semi join, exception will be thrown.
+        // Instead, we do a little trick in RegisterColumnRef to avoid this problem.
         analyzeJoin(analyzer);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 5236776f51..a3fe3345c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1375,6 +1375,15 @@ public class SingleNodePlanner {
             List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs(
                     outputSmap.getRhs(), rootNode.getTupleIds(), analyzer);
             outputSmap = new ExprSubstitutionMap(outputSmap.getLhs(), nullableRhs);
+            // When we process outer join with inline views, we set slot descriptor of inline view to nullable firstly.
+            // When we generate plan, we remove inline view, so the upper node's input is inline view's child.
+            // So we need to set slot descriptor of inline view's child to nullable to ensure consistent behavior
+            // with BaseTable.
+            for (TupleId tupleId : rootNode.getTupleIds()) {
+                for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getMaterializedSlots()) {
+                    slotDescriptor.setIsNullable(true);
+                }
+            }
         }
         // Set output smap of rootNode *before* creating a SelectNode for proper resolution.
         rootNode.setOutputSmap(outputSmap);
diff --git a/regression-test/data/correctness/test_outer_join_with_inline_view.out b/regression-test/data/correctness/test_outer_join_with_inline_view.out
new file mode 100644
index 0000000000..52a9fc4158
--- /dev/null
+++ b/regression-test/data/correctness/test_outer_join_with_inline_view.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select_with_order_by --
+1	1	1	1
+2	2	2	2
+3	3	\N	\N
+4	4	\N	\N
+
+-- !select_with_agg_in_inline_view --
+1	1	1	1
+2	2	2	1
+3	3	\N	\N
+4	4	\N	\N
+
diff --git a/regression-test/suites/correctness/test_outer_join_with_inline_view.groovy b/regression-test/suites/correctness/test_outer_join_with_inline_view.groovy
new file mode 100644
index 0000000000..1a354ab0e3
--- /dev/null
+++ b/regression-test/suites/correctness/test_outer_join_with_inline_view.groovy
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outer_join_with_inline_view") {
+    sql """
+        drop table if exists ojwiv_t1;
+    """
+
+    sql """
+        drop table if exists ojwiv_t2;
+    """
+    
+    sql """
+        create table if not exists ojwiv_t1(
+          k1 int not null,
+          v1 int not null
+        )
+        distributed by hash(k1)
+        properties(
+          'replication_num' = '1'
+        );
+    """
+
+    sql """
+        create table if not exists ojwiv_t2(
+          k1 int not null,
+          c1 varchar(255) not null
+        )
+        distributed by hash(k1)
+        properties('replication_num' = '1');
+    """
+
+    sql """
+        insert into ojwiv_t1 values(1, 1), (2, 2), (3, 3), (4, 4);
+    """
+
+    sql """
+        insert into ojwiv_t2 values(1, '1'), (2, '2');
+    """
+
+    qt_select_with_order_by """
+        select * from
+          (select * from ojwiv_t1) a
+        left outer join
+          (select * from ojwiv_t2) b
+        on a.k1 = b.k1
+        order by a.v1; 
+    """
+
+    qt_select_with_agg_in_inline_view """
+        select * from
+          (select * from ojwiv_t1) a
+        left outer join
+          (select k1, count(distinct c1) from ojwiv_t2 group by k1) b
+        on a.k1 = b.k1
+        order by a.v1; 
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org