You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/04 12:45:36 UTC
[doris] 03/03: [Fix](lazy_open) fix lazy open commit info lose (#20404)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit e96a230c65541f9245205c0827372aa242ec99cd
Author: HHoflittlefish777 <77...@users.noreply.github.com>
AuthorDate: Sun Jun 4 19:08:36 2023 +0800
[Fix](lazy_open) fix lazy open commit info lose (#20404)
---
be/src/vec/sink/vtablet_sink.cpp | 23 ++++-
be/src/vec/sink/vtablet_sink.h | 2 +
.../test_materialized_view_lazy_open.groovy | 103 ++++++++++++++++++++-
3 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 9533ad639e..a5711f4c12 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -516,7 +516,8 @@ Status VNodeChannel::open_wait() {
}
void VNodeChannel::open_partition(int64_t partition_id) {
- _timeout_watch.reset();
+ MonotonicStopWatch lazy_open_timeout_watch;
+ lazy_open_timeout_watch.start();
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
OpenPartitionRequest request;
auto load_id = std::make_shared<PUniqueId>(_parent->_load_id);
@@ -533,7 +534,7 @@ void VNodeChannel::open_partition(int64_t partition_id) {
auto open_partition_closure =
std::make_unique<OpenPartitionClosure>(this, _index_channel, partition_id);
- int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time();
+ int remain_ms = _rpc_timeout_ms - lazy_open_timeout_watch.elapsed_time();
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
remain_ms = config::min_load_rpc_timeout_ms;
}
@@ -546,6 +547,12 @@ void VNodeChannel::open_partition(int64_t partition_id) {
request.release_id();
}
+void VNodeChannel::open_partition_wait() {
+ for (auto& open_partition_closure : _open_partition_closures) {
+ open_partition_closure->join();
+ }
+}
+
Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, bool is_append) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
if (payload->second.empty()) {
@@ -884,9 +891,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
Status VNodeChannel::close_wait(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
- for (auto& open_partition_closure : _open_partition_closures) {
- open_partition_closure->join();
- }
// set _is_closed to true finally
Defer set_closed {[&]() {
std::lock_guard<std::mutex> l(_closed_lock);
@@ -1407,6 +1411,15 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
num_node_channels = 0;
VNodeChannelStat channel_stat;
{
+ if (config::enable_lazy_open_partition) {
+ for (auto index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [](const std::shared_ptr<VNodeChannel>& ch) {
+ ch->open_partition_wait();
+ });
+ }
+ }
+
for (auto index_channel : _channels) {
index_channel->for_each_node_channel(
[](const std::shared_ptr<VNodeChannel>& ch) { ch->mark_close(); });
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index f9edeff693..1e91f4247f 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -230,6 +230,8 @@ public:
Status open_wait();
+ void open_partition_wait();
+
Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false);
int try_send_and_fetch_status(RuntimeState* state,
diff --git a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
index 064acdd9d4..599141d9c1 100644
--- a/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
+++ b/regression-test/suites/rollup_p0/test_materialized_view_lazy_open.groovy
@@ -22,6 +22,8 @@ suite("test_materialized_view_lazy_open", "rollup") {
def tbName1 = "test_materialized_view_lazy_open"
def tbName2 = "test_materialized_view_lazy_open_dynamic_partition"
+ def tbName3 = "test_materialized_view_lazy_open_schema_change"
+ def tbName4 = "test_materialized_view_lazy_open_dynamic_partition_schema_change"
def getJobState = { tableName ->
def jobStateResult = sql """ SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """
@@ -44,7 +46,10 @@ suite("test_materialized_view_lazy_open", "rollup") {
PARTITION p3 VALUES LESS THAN ("2020-01-01")
)
DISTRIBUTED BY HASH(k1) BUCKETS 32
- properties("replication_num" = "1");
+ properties(
+ "light_schema_change" = "false",
+ "replication_num" = "1"
+ );
"""
sql "DROP TABLE IF EXISTS ${tbName2}"
@@ -70,6 +75,58 @@ suite("test_materialized_view_lazy_open", "rollup") {
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32",
+ "light_schema_change" = "false",
+ "replication_num"="1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tbName3}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName3}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ DUPLICATE KEY(k1, k2)
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 32
+
+ properties(
+ "light_schema_change" = "true",
+ "replication_num" = "1"
+ );
+ """
+
+ sql "DROP TABLE IF EXISTS ${tbName4}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tbName4}(
+ k1 DATE,
+ k2 DECIMAL(10, 2),
+ k3 CHAR(10),
+ k4 INT NOT NULL
+ )
+ PARTITION BY RANGE(k1)
+ (
+ PARTITION p1 VALUES LESS THAN ("2000-01-01"),
+ PARTITION p2 VALUES LESS THAN ("2010-01-01"),
+ PARTITION p3 VALUES LESS THAN ("2020-01-01")
+ )
+ DISTRIBUTED BY HASH(k1)
+ PROPERTIES
+ (
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-2147483648",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "32",
+ "light_schema_change" = "true",
"replication_num"="1"
);
"""
@@ -106,12 +163,54 @@ suite("test_materialized_view_lazy_open", "rollup") {
}
}
+ sql "CREATE materialized VIEW test_lazy_open_schema_change AS SELECT k1 FROM ${tbName3} GROUP BY k1;"
+ max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName3)
+ if (res == "FINISHED") {
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
+ sql "CREATE materialized VIEW test_lazy_open_dynamic_partition_schema_change AS SELECT k1 FROM ${tbName4} GROUP BY k1;"
+ max_try_secs = 60
+ while (max_try_secs--) {
+ String res = getJobState(tbName4)
+ if (res == "FINISHED") {
+ sleep(3000)
+ break
+ } else {
+ Thread.sleep(2000)
+ if (max_try_secs < 1) {
+ println "test timeout," + "state:" + res
+ assertEquals("FINISHED",res)
+ }
+ }
+ }
+
sql "insert into ${tbName1} values('2000-05-20', 1.5, 'test', 1);"
sql "insert into ${tbName1} values('2010-05-20', 1.5, 'test', 1);"
+
sql "insert into ${tbName2} values('2000-05-20', 1.5, 'test', 1);"
sql "insert into ${tbName2} values('2010-05-20', 1.5, 'test', 1);"
+ sql "insert into ${tbName3} values('2000-05-20', 1.5, 'test', 1);"
+ sql "ALTER table ${tbName3} ADD COLUMN new_column INT;"
+ sql "insert into ${tbName3} values('2010-05-20', 1.5, 'test', 1, 1);"
+
+ sql "insert into ${tbName4} values('2000-05-20', 1.5, 'test', 1);"
+ sql "ALTER table ${tbName4} ADD COLUMN new_column INT;"
+ sql "insert into ${tbName4} values('2010-05-20', 1.5, 'test', 1, 1);"
+
sql "DROP TABLE ${tbName1} FORCE;"
sql "DROP TABLE ${tbName2} FORCE;"
-
+ sql "DROP TABLE ${tbName3} FORCE;"
+ sql "DROP TABLE ${tbName4} FORCE;"
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org