You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/22 14:51:33 UTC
[incubator-doris] 08/13: [refactor](load) add tablet errors when close_wait return error (#9619)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 0e38fa4e9d4b18746987560d84f8b44c59501fa0
Author: pengxiangyu <di...@163.com>
AuthorDate: Sun May 22 21:27:42 2022 +0800
[refactor](load) add tablet errors when close_wait return error (#9619)
---
be/src/olap/delta_writer.cpp | 21 +---
be/src/olap/delta_writer.h | 6 +-
be/src/runtime/tablets_channel.cpp | 21 +++-
be/src/runtime/tablets_channel.h | 6 +-
be/test/olap/delta_writer_test.cpp | 246 ++++++++++++++++++++++++++++++++++++-
5 files changed, 271 insertions(+), 29 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 198019561f..58b89e5d64 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -279,9 +279,7 @@ OLAPStatus DeltaWriter::close() {
return OLAP_SUCCESS;
}
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
- bool is_broken) {
+OLAPStatus DeltaWriter::close_wait() {
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
@@ -291,14 +289,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
}
// return error if previous flush failed
- OLAPStatus st = _flush_token->wait();
- if (st != OLAP_SUCCESS) {
- PTabletError* tablet_error = tablet_errors->Add();
- tablet_error->set_tablet_id(_tablet->tablet_id());
- tablet_error->set_msg("flush failed");
- return st;
- }
- DCHECK_EQ(_mem_tracker->consumption(), 0);
+ RETURN_NOT_OK(_flush_token->wait());
// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
@@ -314,14 +305,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
return res;
}
-#ifndef BE_TEST
- if (!is_broken) {
- PTabletInfo* tablet_info = tablet_vec->Add();
- tablet_info->set_tablet_id(_tablet->tablet_id());
- tablet_info->set_schema_hash(_tablet->schema_hash());
- }
-#endif
-
_delta_written_success = true;
const FlushStatistic& stat = _flush_token->get_stats();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index cf5a2729d2..70164e56c8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,9 +67,7 @@ public:
OLAPStatus close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
- OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
- google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
- bool is_broken);
+ OLAPStatus close_wait();
// abandon current memtable and wait for all pending-flushing memtables to be destructed.
// mem_consumption() should be 0 after this function returns.
@@ -91,6 +89,8 @@ public:
int64_t tablet_id() { return _tablet->tablet_id(); }
+ int32_t schema_hash() { return _tablet->schema_hash(); }
+
int64_t save_mem_consumption_snapshot();
int64_t get_mem_consumption_snapshot() const;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 926cba7aaa..19f342360c 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -198,9 +198,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
for (auto writer : need_wait_writers) {
// close may return failed, but no need to handle it here.
// tablet_vec will only contains success tablet, and then let FE judge it.
- writer->close_wait(
- tablet_vec, tablet_errors,
- (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
+ _close_wait(writer, tablet_vec, tablet_errors);
}
// TODO(gaodayue) clear and destruct all delta writers to make sure all memory are freed
// DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -208,6 +206,23 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
return Status::OK();
}
+void TabletsChannel::_close_wait(DeltaWriter* writer,
+ google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
+ OLAPStatus st = writer->close_wait();
+ if (st != OLAP_SUCCESS) {
+ if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
+ PTabletInfo* tablet_info = tablet_vec->Add();
+ tablet_info->set_tablet_id(writer->tablet_id());
+ tablet_info->set_schema_hash(writer->schema_hash());
+ }
+ } else {
+ PTabletError* tablet_error = tablet_errors->Add();
+ tablet_error->set_tablet_id(writer->tablet_id());
+ tablet_error->set_msg("close wait failed: " + st);
+ }
+}
+
Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 360242ae88..cd5ee67867 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -89,7 +89,11 @@ private:
// open all writer
Status _open_all_writers(const PTabletWriterOpenRequest& request);
-private:
+ // deal with DeltaWriter close_wait(), add tablet to list for return.
+ void _close_wait(DeltaWriter* writer,
+ google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);
+
// id of this load channel
TabletsChannelKey _key;
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 1e2cab2e1e..e51dbe9605 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -371,9 +371,18 @@ TEST_F(TestDeltaWriter, open) {
DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
res = delta_writer->close();
- ASSERT_EQ(OLAP_SUCCESS, res);
- res = delta_writer->close_wait(nullptr, false);
- ASSERT_EQ(OLAP_SUCCESS, res);
+ EXPECT_EQ(OLAP_SUCCESS, res);
+ res = delta_writer->close_wait();
+ EXPECT_EQ(OLAP_SUCCESS, res);
+ SAFE_DELETE(delta_writer);
+
+ // test vec delta writer
+ DeltaWriter::open(&write_req, &delta_writer, true);
+ EXPECT_NE(delta_writer, nullptr);
+ res = delta_writer->close();
+ EXPECT_EQ(OLAP_SUCCESS, res);
+ res = delta_writer->close_wait();
+ EXPECT_EQ(OLAP_SUCCESS, res);
SAFE_DELETE(delta_writer);
TDropTabletReq drop_request;
@@ -470,9 +479,149 @@ TEST_F(TestDeltaWriter, write) {
}
res = delta_writer->close();
+<<<<<<< HEAD
ASSERT_EQ(OLAP_SUCCESS, res);
res = delta_writer->close_wait(nullptr, false);
ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->close_wait();
+ EXPECT_EQ(Status::OK(), res);
+
+ // publish version success
+ TabletSharedPtr tablet =
+ k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+ OlapMeta* meta = tablet->data_dir()->get_meta();
+ Version version;
+ version.first = tablet->rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+ for (auto& tablet_rs : tablet_related_rs) {
+ RowsetSharedPtr rowset = tablet_rs.second;
+ res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+ write_req.tablet_id, write_req.schema_hash,
+ tablet_rs.first.tablet_uid, version);
+ EXPECT_EQ(Status::OK(), res);
+ res = tablet->add_inc_rowset(rowset);
+ EXPECT_EQ(Status::OK(), res);
+ }
+ EXPECT_EQ(1, tablet->num_rows());
+
+ auto tablet_id = 10003;
+ auto schema_hash = 270068375;
+ res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+ EXPECT_EQ(Status::OK(), res);
+ delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_write) {
+ TCreateTabletReq request;
+ create_tablet_request(10004, 270068376, &request);
+ Status res = k_engine->create_tablet(request);
+ ASSERT_TRUE(res.ok());
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ // const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+ WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
+ 30002, load_id, tuple_desc, &(tuple_desc->slots())};
+ DeltaWriter* delta_writer = nullptr;
+ DeltaWriter::open(&write_req, &delta_writer, true);
+ ASSERT_NE(delta_writer, nullptr);
+
+ auto tracker = std::make_shared<MemTracker>();
+ MemPool pool(tracker.get());
+
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+
+ auto columns = block.mutate_columns();
+ {
+ int8_t k1 = -127;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+
+ int16_t k2 = -32767;
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+
+ int32_t k3 = -2147483647;
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+
+ int64_t k4 = -9223372036854775807L;
+ columns[3]->insert_data((const char*)&k4, sizeof(k4));
+
+ int128_t k5 = -90000;
+ columns[4]->insert_data((const char*)&k5, sizeof(k5));
+
+ DateTimeValue k6;
+ k6.from_date_str("2048-11-10", 10);
+ auto k6_int = k6.to_int64();
+ columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int));
+
+ DateTimeValue k7;
+ k7.from_date_str("2636-08-16 19:39:43", 19);
+ auto k7_int = k7.to_int64();
+ columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int));
+
+ columns[7]->insert_data("abcd", 4);
+ columns[8]->insert_data("abcde", 5);
+
+ DecimalV2Value decimal_value;
+ decimal_value.assign_from_double(1.1);
+ columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+ int8_t v1 = -127;
+ columns[10]->insert_data((const char*)&v1, sizeof(v1));
+
+ int16_t v2 = -32767;
+ columns[11]->insert_data((const char*)&v2, sizeof(v2));
+
+ int32_t v3 = -2147483647;
+ columns[12]->insert_data((const char*)&v3, sizeof(v3));
+
+ int64_t v4 = -9223372036854775807L;
+ columns[13]->insert_data((const char*)&v4, sizeof(v4));
+
+ int128_t v5 = -90000;
+ columns[14]->insert_data((const char*)&v5, sizeof(v5));
+
+ DateTimeValue v6;
+ v6.from_date_str("2048-11-10", 10);
+ auto v6_int = v6.to_int64();
+ columns[15]->insert_data((const char*)&v6_int, sizeof(v6_int));
+
+ DateTimeValue v7;
+ v7.from_date_str("2636-08-16 19:39:43", 19);
+ auto v7_int = v7.to_int64();
+ columns[16]->insert_data((const char*)&v7_int, sizeof(v7_int));
+
+ columns[17]->insert_data("abcd", 4);
+ columns[18]->insert_data("abcde", 5);
+
+ decimal_value.assign_from_double(1.1);
+ columns[19]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+ res = delta_writer->write(&block, {0});
+ ASSERT_TRUE(res.ok());
+ }
+
+ res = delta_writer->close();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->close_wait();
+ ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
// publish version success
TabletSharedPtr tablet =
@@ -548,9 +697,100 @@ TEST_F(TestDeltaWriter, sequence_col) {
}
res = delta_writer->close();
+<<<<<<< HEAD
ASSERT_EQ(OLAP_SUCCESS, res);
res = delta_writer->close_wait(nullptr, false);
ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+ EXPECT_EQ(Status::OK(), res);
+ res = delta_writer->close_wait();
+ EXPECT_EQ(Status::OK(), res);
+
+ // publish version success
+ TabletSharedPtr tablet =
+ k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+ OlapMeta* meta = tablet->data_dir()->get_meta();
+ Version version;
+ version.first = tablet->rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+ for (auto& tablet_rs : tablet_related_rs) {
+ RowsetSharedPtr rowset = tablet_rs.second;
+ res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+ write_req.tablet_id, write_req.schema_hash,
+ tablet_rs.first.tablet_uid, version);
+ EXPECT_EQ(Status::OK(), res);
+ res = tablet->add_inc_rowset(rowset);
+ EXPECT_EQ(Status::OK(), res);
+ }
+ EXPECT_EQ(1, tablet->num_rows());
+
+ auto tablet_id = 10005;
+ auto schema_hash = 270068377;
+ res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+ EXPECT_EQ(Status::OK(), res);
+ delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_sequence_col) {
+ TCreateTabletReq request;
+ sleep(20);
+ create_tablet_request_with_sequence_col(10005, 270068377, &request);
+ Status res = k_engine->create_tablet(request);
+ ASSERT_TRUE(res.ok());
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+ WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+ 30003, load_id, tuple_desc, &(tuple_desc->slots())};
+ DeltaWriter* delta_writer = nullptr;
+ DeltaWriter::open(&write_req, &delta_writer, true);
+ ASSERT_NE(delta_writer, nullptr);
+
+ MemTracker tracker;
+ MemPool pool(&tracker);
+
+ vectorized::Block block;
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+
+ auto columns = block.mutate_columns();
+ {
+ int8_t c1 = 123;
+ columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+ int16_t c2 = 456;
+ columns[1]->insert_data((const char*)&c2, sizeof(c2));
+
+ int32_t c3 = 1;
+ columns[2]->insert_data((const char*)&c3, sizeof(c2));
+
+ DateTimeValue c4;
+ c4.from_date_str("2020-07-16 19:39:43", 19);
+ int64_t c4_int = c4.to_int64();
+ columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+
+ res = delta_writer->write(&block, {0});
+ ASSERT_TRUE(res.ok());
+ }
+
+ res = delta_writer->close();
+ ASSERT_TRUE(res.ok());
+ res = delta_writer->close_wait();
+ ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
// publish version success
TabletSharedPtr tablet =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org