You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/04 12:45:33 UTC

[doris] branch branch-2.0-beta updated (5b026df60d -> e96a230c65)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a change to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 5b026df60d [typo](doc) Fixed typos in hive.md (#19457)
     new c9acb47808 [fix](community) fix PR template (#20400)
     new f70d5f63ba [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)
     new e96a230c65 [Fix](lazy_open) fix lazy open commit info lose (#20404)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/PULL_REQUEST_TEMPLATE.md                   |   2 +-
 be/src/runtime/fragment_mgr.cpp                    |  49 ++++++++--
 be/src/vec/sink/vtablet_sink.cpp                   |  23 ++++-
 be/src/vec/sink/vtablet_sink.h                     |   2 +
 .../test_materialized_view_lazy_open.groovy        | 103 ++++++++++++++++++++-
 5 files changed, 161 insertions(+), 18 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f70d5f63baceca748c638a862050f0ef2ad57407
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sun Jun 4 12:10:35 2023 +0800

    [pipeline](opt) Opt fragment instance prepare performance by thread pool (#20399)
---
 be/src/runtime/fragment_mgr.cpp | 49 ++++++++++++++++++++++++++++++++---------
 1 file changed, 39 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 26646bbd0e..543b10f32a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -848,11 +848,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
     VLOG_ROW << "query options is "
              << apache::thrift::ThriftDebugString(params.query_options).c_str();
 
-    std::shared_ptr<FragmentExecState> exec_state;
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
-
-    for (size_t i = 0; i < params.local_params.size(); i++) {
+    auto pre_and_submit = [&](int i) {
         const auto& local_params = params.local_params[i];
 
         const TUniqueId& fragment_instance_id = local_params.fragment_instance_id;
@@ -861,15 +859,14 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             auto iter = _pipeline_map.find(fragment_instance_id);
             if (iter != _pipeline_map.end()) {
                 // Duplicated
-                continue;
+                return Status::OK();
             }
+            query_ctx->fragment_ids.push_back(fragment_instance_id);
         }
         START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
         span->SetAttribute("instance_id", print_id(fragment_instance_id));
 
-        query_ctx->fragment_ids.push_back(fragment_instance_id);
-
-        exec_state.reset(new FragmentExecState(
+        std::shared_ptr<FragmentExecState> exec_state(new FragmentExecState(
                 query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
                 query_ctx,
                 std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
@@ -912,10 +909,42 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
             _pipeline_map.insert(std::make_pair(fragment_instance_id, context));
             _cv.notify_all();
         }
-        RETURN_IF_ERROR(context->submit());
-    }
 
-    return Status::OK();
+        return context->submit();
+    };
+
+    int target_size = params.local_params.size();
+    if (target_size > 1) {
+        int prepare_done = {0};
+        Status prepare_status[target_size];
+        std::mutex m;
+        std::condition_variable cv;
+
+        for (size_t i = 0; i < target_size; i++) {
+            _thread_pool->submit_func([&, i]() {
+                prepare_status[i] = pre_and_submit(i);
+                std::unique_lock<std::mutex> lock(m);
+                prepare_done++;
+                if (prepare_done == target_size) {
+                    cv.notify_one();
+                }
+            });
+        }
+
+        std::unique_lock<std::mutex> lock(m);
+        if (prepare_done != target_size) {
+            cv.wait(lock);
+
+            for (size_t i = 0; i < target_size; i++) {
+                if (!prepare_status[i].ok()) {
+                    return prepare_status[i];
+                }
+            }
+        }
+        return Status::OK();
+    } else {
+        return pre_and_submit(0);
+    }
 }
 
 template <typename Param>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [Fix](lazy_open) fix lazy open commit info lose (#20404)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e96a230c65541f9245205c0827372aa242ec99cd
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Sun Jun 4 19:08:36 2023 +0800

    [Fix](lazy_open) fix lazy open commit info lose (#20404)
---
 be/src/vec/sink/vtablet_sink.cpp                   |  23 ++++-
 be/src/vec/sink/vtablet_sink.h                     |   2 +
 .../test_materialized_view_lazy_open.groovy        | 103 ++++++++++++++++++++-
 3 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 9533ad639e..a5711f4c12 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -516,7 +516,8 @@ Status VNodeChannel::open_wait() {
 }
 
 void VNodeChannel::open_partition(int64_t partition_id) {
-    _timeout_watch.reset();
+    MonotonicStopWatch lazy_open_timeout_watch;
+    lazy_open_timeout_watch.start();
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     OpenPartitionRequest request;
     auto load_id = std::make_shared<PUniqueId>(_parent->_load_id);
@@ -533,7 +534,7 @@ void VNodeChannel::open_partition(int64_t partition_id) {
     auto open_partition_closure =
             std::make_unique<OpenPartitionClosure>(this, _index_channel, partition_id);
 
-    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time();
+    int remain_ms = _rpc_timeout_ms - lazy_open_timeout_watch.elapsed_time();
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         remain_ms = config::min_load_rpc_timeout_ms;
     }
@@ -546,6 +547,12 @@ void VNodeChannel::open_partition(int64_t partition_id) {
     request.release_id();
 }
 
+void VNodeChannel::open_partition_wait() {
+    for (auto& open_partition_closure : _open_partition_closures) {
+        open_partition_closure->join();
+    }
+}
+
 Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, bool is_append) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
@@ -884,9 +891,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    for (auto& open_partition_closure : _open_partition_closures) {
-        open_partition_closure->join();
-    }
     // set _is_closed to true finally
     Defer set_closed {[&]() {
         std::lock_guard<std::mutex> l(_closed_lock);
@@ -1407,6 +1411,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
                 num_node_channels = 0;
         VNodeChannelStat channel_stat;
         {
+            if (config::enable_lazy_open_partition) {
+                for (auto index_channel : _channels) {
+                    index_channel->for_each_node_channel(
+                            [](const std::shared_ptr<VNodeChannel>& ch) {
+                                ch->open_partition_wait();
+                            });
+                }
+            }
+
             for (auto index_channel : _channels) {
                 index_channel->for_each_node_channel(
                         [](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); });
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index f9edeff693..1e91f4247f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -230,6 +230,8 @@ public:
 
     Status open_wait();
 
+    void open_partition_wait();
+
     Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false);
 
     int try_send_and_fetch_status(RuntimeState* state,
diff --git a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
index 064acdd9d4..599141d9c1 100644
--- a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
+++ b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
@@ -22,6 +22,8 @@ suite("test_materialized_view_lazy_open", "rollup") {
 
     def tbName1 = "test_materialized_view_lazy_open"
     def tbName2 = "test_materialized_view_lazy_open_dynamic_partition"
+    def tbName3 = "test_materialized_view_lazy_open_schema_change"
+    def tbName4 = "test_materialized_view_lazy_open_dynamic_partition_schema_change"
 
     def getJobState = { tableName ->
         def jobStateResult = sql """  SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """
@@ -44,7 +46,10 @@ suite("test_materialized_view_lazy_open", "rollup") {
                PARTITION p3 VALUES LESS THAN ("2020-01-01")
             )
             DISTRIBUTED BY HASH(k1) BUCKETS 32 
-            properties("replication_num" = "1");
+            properties(
+                "light_schema_change" = "false",
+                "replication_num" = "1"
+            );
         """
 
     sql "DROP TABLE IF EXISTS ${tbName2}"
@@ -70,6 +75,58 @@ suite("test_materialized_view_lazy_open", "rollup") {
                 "dynamic_partition.end" = "3",
                 "dynamic_partition.prefix" = "p",
                 "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "false",
+                "replication_num"="1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName3}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName3}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            DUPLICATE KEY(k1, k2)
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 32 
+
+            properties(
+                "light_schema_change" = "true",
+                "replication_num" = "1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName4}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName4}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1)
+            PROPERTIES
+            (
+                "dynamic_partition.enable" = "true",
+                "dynamic_partition.time_unit" = "DAY",
+                "dynamic_partition.start" = "-2147483648",
+                "dynamic_partition.end" = "3",
+                "dynamic_partition.prefix" = "p",
+                "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "true",
                 "replication_num"="1"
             );
         """
@@ -106,12 +163,54 @@ suite("test_materialized_view_lazy_open", "rollup") {
         }
     }
 
+    sql "CREATE materialized VIEW test_lazy_open_schema_change AS SELECT k1 FROM ${tbName3} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName3)
+        if (res == "FINISHED") {
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "CREATE materialized VIEW test_lazy_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName4)
+        if (res == "FINISHED") {
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
     sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);"
     sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);"
+
     sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);"
     sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);"
 
+    sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName3} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+    sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName4} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);"
+
     sql "DROP TABLE ${tbName1} FORCE;"
     sql "DROP TABLE ${tbName2} FORCE;"
-
+    sql "DROP TABLE ${tbName3} FORCE;"
+    sql "DROP TABLE ${tbName4} FORCE;"
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: [fix](community) fix PR template (#20400)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c9acb478086b7e493cc278822a3b22a8bd8d9791
Author: zwuis <42...@users.noreply.github.com>
AuthorDate: Sun Jun 4 08:44:32 2023 +0800

    [fix](community) fix PR template (#20400)
---
 .github/PULL_REQUEST_TEMPLATE.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index fed38e0cd5..0a633554a0 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -2,7 +2,7 @@
 
 Issue Number: close #xxx
 
-<--Describe your changes.-->
+<!--Describe your changes.-->
 
 ## Further comments
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org