You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/04 11:08:44 UTC

[doris] branch master updated: [Fix](lazy_open) fix lazy open commit info lose (#20404)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a1b7599f [Fix](lazy_open) fix lazy open commit info lose (#20404)
34a1b7599f is described below

commit 34a1b7599f60ff9f0674bde8ba1da581b0443221
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Sun Jun 4 19:08:36 2023 +0800

    [Fix](lazy_open) fix lazy open commit info lose (#20404)
---
 be/src/vec/sink/vtablet_sink.cpp                   |  23 ++++-
 be/src/vec/sink/vtablet_sink.h                     |   2 +
 .../test_materialized_view_lazy_open.groovy        | 103 ++++++++++++++++++++-
 3 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 9533ad639e..a5711f4c12 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -516,7 +516,8 @@ Status VNodeChannel::open_wait() {
 }
 
 void VNodeChannel::open_partition(int64_t partition_id) {
-    _timeout_watch.reset();
+    MonotonicStopWatch lazy_open_timeout_watch;
+    lazy_open_timeout_watch.start();
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     OpenPartitionRequest request;
     auto load_id = std::make_shared<PUniqueId>(_parent->_load_id);
@@ -533,7 +534,7 @@ void VNodeChannel::open_partition(int64_t partition_id) {
     auto open_partition_closure =
             std::make_unique<OpenPartitionClosure>(this, _index_channel, partition_id);
 
-    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time();
+    int remain_ms = _rpc_timeout_ms - lazy_open_timeout_watch.elapsed_time();
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         remain_ms = config::min_load_rpc_timeout_ms;
     }
@@ -546,6 +547,12 @@ void VNodeChannel::open_partition(int64_t partition_id) {
     request.release_id();
 }
 
+void VNodeChannel::open_partition_wait() {
+    for (auto& open_partition_closure : _open_partition_closures) {
+        open_partition_closure->join();
+    }
+}
+
 Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, bool is_append) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
@@ -884,9 +891,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
-    for (auto& open_partition_closure : _open_partition_closures) {
-        open_partition_closure->join();
-    }
     // set _is_closed to true finally
     Defer set_closed {[&]() {
         std::lock_guard<std::mutex> l(_closed_lock);
@@ -1407,6 +1411,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
                 num_node_channels = 0;
         VNodeChannelStat channel_stat;
         {
+            if (config::enable_lazy_open_partition) {
+                for (auto index_channel : _channels) {
+                    index_channel->for_each_node_channel(
+                            [](const std::shared_ptr<VNodeChannel>& ch) {
+                                ch->open_partition_wait();
+                            });
+                }
+            }
+
             for (auto index_channel : _channels) {
                 index_channel->for_each_node_channel(
                         [](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); });
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index f9edeff693..1e91f4247f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -230,6 +230,8 @@ public:
 
     Status open_wait();
 
+    void open_partition_wait();
+
     Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false);
 
     int try_send_and_fetch_status(RuntimeState* state,
diff --git a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
index 064acdd9d4..599141d9c1 100644
--- a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
+++ b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
@@ -22,6 +22,8 @@ suite("test_materialized_view_lazy_open", "rollup") {
 
     def tbName1 = "test_materialized_view_lazy_open"
     def tbName2 = "test_materialized_view_lazy_open_dynamic_partition"
+    def tbName3 = "test_materialized_view_lazy_open_schema_change"
+    def tbName4 = "test_materialized_view_lazy_open_dynamic_partition_schema_change"
 
     def getJobState = { tableName ->
         def jobStateResult = sql """  SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """
@@ -44,7 +46,10 @@ suite("test_materialized_view_lazy_open", "rollup") {
                PARTITION p3 VALUES LESS THAN ("2020-01-01")
             )
             DISTRIBUTED BY HASH(k1) BUCKETS 32 
-            properties("replication_num" = "1");
+            properties(
+                "light_schema_change" = "false",
+                "replication_num" = "1"
+            );
         """
 
     sql "DROP TABLE IF EXISTS ${tbName2}"
@@ -70,6 +75,58 @@ suite("test_materialized_view_lazy_open", "rollup") {
                 "dynamic_partition.end" = "3",
                 "dynamic_partition.prefix" = "p",
                 "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "false",
+                "replication_num"="1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName3}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName3}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            DUPLICATE KEY(k1, k2)
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1) BUCKETS 32 
+
+            properties(
+                "light_schema_change" = "true",
+                "replication_num" = "1"
+            );
+        """
+    
+    sql "DROP TABLE IF EXISTS ${tbName4}"
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tbName4}(
+                k1 DATE,
+                k2 DECIMAL(10, 2),
+                k3 CHAR(10),
+                k4 INT NOT NULL
+            )
+            PARTITION BY RANGE(k1)
+            (
+               PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+               PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+               PARTITION p3 VALUES LESS THAN ("2020-01-01")
+            )
+            DISTRIBUTED BY HASH(k1)
+            PROPERTIES
+            (
+                "dynamic_partition.enable" = "true",
+                "dynamic_partition.time_unit" = "DAY",
+                "dynamic_partition.start" = "-2147483648",
+                "dynamic_partition.end" = "3",
+                "dynamic_partition.prefix" = "p",
+                "dynamic_partition.buckets" = "32",
+                "light_schema_change" = "true",
                 "replication_num"="1"
             );
         """
@@ -106,12 +163,54 @@ suite("test_materialized_view_lazy_open", "rollup") {
         }
     }
 
+    sql "CREATE materialized VIEW test_lazy_open_schema_change AS SELECT k1 FROM ${tbName3} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName3)
+        if (res == "FINISHED") {
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
+    sql "CREATE materialized VIEW test_lazy_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4} GROUP BY k1;"
+    max_try_secs = 60
+    while (max_try_secs--) {
+        String res = getJobState(tbName4)
+        if (res == "FINISHED") {
+            sleep(3000)
+            break
+        } else {
+            Thread.sleep(2000)
+            if (max_try_secs < 1) {
+                println "test timeout," + "state:" + res
+                assertEquals("FINISHED",res)
+            }
+        }
+    }
+
     sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);"
     sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);"
+
     sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);"
     sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);"
 
+    sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName3} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+    sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);"
+    sql "ALTER table ${tbName4} ADD COLUMN new_column INT;"
+    sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);"
+
     sql "DROP TABLE ${tbName1} FORCE;"
     sql "DROP TABLE ${tbName2} FORCE;"
-
+    sql "DROP TABLE ${tbName3} FORCE;"
+    sql "DROP TABLE ${tbName4} FORCE;"
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org