You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/22 14:51:33 UTC

[incubator-doris] 08/13: [refactor](load) add tablet errors when close_wait return error (#9619)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 0e38fa4e9d4b18746987560d84f8b44c59501fa0
Author: pengxiangyu <di...@163.com>
AuthorDate: Sun May 22 21:27:42 2022 +0800

    [refactor](load) add tablet errors when close_wait return error (#9619)
---
 be/src/olap/delta_writer.cpp       |  21 +---
 be/src/olap/delta_writer.h         |   6 +-
 be/src/runtime/tablets_channel.cpp |  21 +++-
 be/src/runtime/tablets_channel.h   |   6 +-
 be/test/olap/delta_writer_test.cpp | 246 ++++++++++++++++++++++++++++++++++++-
 5 files changed, 271 insertions(+), 29 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 198019561f..58b89e5d64 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -279,9 +279,7 @@ OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                               google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                               bool is_broken) {
+OLAPStatus DeltaWriter::close_wait() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() being called";
@@ -291,14 +289,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
     }
 
     // return error if previous flush failed
-    OLAPStatus st = _flush_token->wait();
-    if (st != OLAP_SUCCESS) {
-        PTabletError* tablet_error = tablet_errors->Add();
-        tablet_error->set_tablet_id(_tablet->tablet_id());
-        tablet_error->set_msg("flush failed");
-        return st;
-    }
-    DCHECK_EQ(_mem_tracker->consumption(), 0);
+    RETURN_NOT_OK(_flush_token->wait());
 
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
@@ -314,14 +305,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
         return res;
     }
 
-#ifndef BE_TEST
-    if (!is_broken) {
-        PTabletInfo* tablet_info = tablet_vec->Add();
-        tablet_info->set_tablet_id(_tablet->tablet_id());
-        tablet_info->set_schema_hash(_tablet->schema_hash());
-    }
-#endif
-
     _delta_written_success = true;
 
     const FlushStatistic& stat = _flush_token->get_stats();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index cf5a2729d2..70164e56c8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,9 +67,7 @@ public:
     OLAPStatus close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    OLAPStatus close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                      google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                      bool is_broken);
+    OLAPStatus close_wait();
 
     // abandon current memtable and wait for all pending-flushing memtables to be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -91,6 +89,8 @@ public:
 
     int64_t tablet_id() { return _tablet->tablet_id(); }
 
+    int32_t schema_hash() { return _tablet->schema_hash(); }
+
     int64_t save_mem_consumption_snapshot();
 
     int64_t get_mem_consumption_snapshot() const;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 926cba7aaa..19f342360c 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -198,9 +198,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
         for (auto writer : need_wait_writers) {
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE judge it.
-            writer->close_wait(
-                    tablet_vec, tablet_errors,
-                    (_broken_tablets.find(writer->tablet_id()) != _broken_tablets.end()));
+            _close_wait(writer, tablet_vec, tablet_errors);
         }
         // TODO(gaodayue) clear and destruct all delta writers to make sure all memory are freed
         // DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -208,6 +206,23 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
     return Status::OK();
 }
 
+void TabletsChannel::_close_wait(DeltaWriter* writer,
+                                 google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                                 google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors) {
+    OLAPStatus st = writer->close_wait();
+    if (st != OLAP_SUCCESS) {
+        if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
+            PTabletInfo* tablet_info = tablet_vec->Add();
+            tablet_info->set_tablet_id(writer->tablet_id());
+            tablet_info->set_schema_hash(writer->schema_hash());
+        }
+    } else {
+        PTabletError* tablet_error = tablet_errors->Add();
+        tablet_error->set_tablet_id(writer->tablet_id());
+        tablet_error->set_msg("close wait failed: " + st);
+    }
+}
+
 Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 360242ae88..cd5ee67867 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -89,7 +89,11 @@ private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-private:
+    // deal with DeltaWriter close_wait(), add tablet to list for return.
+    void _close_wait(DeltaWriter* writer,
+                     google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
+                     google::protobuf::RepeatedPtrField<PTabletError>* tablet_error);
+
     // id of this load channel
     TabletsChannelKey _key;
 
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 1e2cab2e1e..e51dbe9605 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -371,9 +371,18 @@ TEST_F(TestDeltaWriter, open) {
     DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
     res = delta_writer->close();
-    ASSERT_EQ(OLAP_SUCCESS, res);
-    res = delta_writer->close_wait(nullptr, false);
-    ASSERT_EQ(OLAP_SUCCESS, res);
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    SAFE_DELETE(delta_writer);
+
+    // test vec delta writer
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    EXPECT_NE(delta_writer, nullptr);
+    res = delta_writer->close();
+    EXPECT_EQ(OLAP_SUCCESS, res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(OLAP_SUCCESS, res);
     SAFE_DELETE(delta_writer);
 
     TDropTabletReq drop_request;
@@ -470,9 +479,149 @@ TEST_F(TestDeltaWriter, write) {
     }
 
     res = delta_writer->close();
+<<<<<<< HEAD
     ASSERT_EQ(OLAP_SUCCESS, res);
     res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(Status::OK(), res);
+
+    // publish version success
+    TabletSharedPtr tablet =
+            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    Version version;
+    version.first = tablet->rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+    for (auto& tablet_rs : tablet_related_rs) {
+        RowsetSharedPtr rowset = tablet_rs.second;
+        res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, write_req.schema_hash,
+                                                   tablet_rs.first.tablet_uid, version);
+        EXPECT_EQ(Status::OK(), res);
+        res = tablet->add_inc_rowset(rowset);
+        EXPECT_EQ(Status::OK(), res);
+    }
+    EXPECT_EQ(1, tablet->num_rows());
+
+    auto tablet_id = 10003;
+    auto schema_hash = 270068375;
+    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    EXPECT_EQ(Status::OK(), res);
+    delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_write) {
+    TCreateTabletReq request;
+    create_tablet_request(10004, 270068376, &request);
+    Status res = k_engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    //     const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
+                              30002, load_id,   tuple_desc,      &(tuple_desc->slots())};
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    ASSERT_NE(delta_writer, nullptr);
+
+    auto tracker = std::make_shared<MemTracker>();
+    MemPool pool(tracker.get());
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+
+    auto columns = block.mutate_columns();
+    {
+        int8_t k1 = -127;
+        columns[0]->insert_data((const char*)&k1, sizeof(k1));
+
+        int16_t k2 = -32767;
+        columns[1]->insert_data((const char*)&k2, sizeof(k2));
+
+        int32_t k3 = -2147483647;
+        columns[2]->insert_data((const char*)&k3, sizeof(k3));
+
+        int64_t k4 = -9223372036854775807L;
+        columns[3]->insert_data((const char*)&k4, sizeof(k4));
+
+        int128_t k5 = -90000;
+        columns[4]->insert_data((const char*)&k5, sizeof(k5));
+
+        DateTimeValue k6;
+        k6.from_date_str("2048-11-10", 10);
+        auto k6_int = k6.to_int64();
+        columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int));
+
+        DateTimeValue k7;
+        k7.from_date_str("2636-08-16 19:39:43", 19);
+        auto k7_int = k7.to_int64();
+        columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int));
+
+        columns[7]->insert_data("abcd", 4);
+        columns[8]->insert_data("abcde", 5);
+
+        DecimalV2Value decimal_value;
+        decimal_value.assign_from_double(1.1);
+        columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+        int8_t v1 = -127;
+        columns[10]->insert_data((const char*)&v1, sizeof(v1));
+
+        int16_t v2 = -32767;
+        columns[11]->insert_data((const char*)&v2, sizeof(v2));
+
+        int32_t v3 = -2147483647;
+        columns[12]->insert_data((const char*)&v3, sizeof(v3));
+
+        int64_t v4 = -9223372036854775807L;
+        columns[13]->insert_data((const char*)&v4, sizeof(v4));
+
+        int128_t v5 = -90000;
+        columns[14]->insert_data((const char*)&v5, sizeof(v5));
+
+        DateTimeValue v6;
+        v6.from_date_str("2048-11-10", 10);
+        auto v6_int = v6.to_int64();
+        columns[15]->insert_data((const char*)&v6_int, sizeof(v6_int));
+
+        DateTimeValue v7;
+        v7.from_date_str("2636-08-16 19:39:43", 19);
+        auto v7_int = v7.to_int64();
+        columns[16]->insert_data((const char*)&v7_int, sizeof(v7_int));
+
+        columns[17]->insert_data("abcd", 4);
+        columns[18]->insert_data("abcde", 5);
+
+        decimal_value.assign_from_double(1.1);
+        columns[19]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
+
+        res = delta_writer->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+    }
+
+    res = delta_writer->close();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->close_wait();
+    ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
 
     // publish version success
     TabletSharedPtr tablet =
@@ -548,9 +697,100 @@ TEST_F(TestDeltaWriter, sequence_col) {
     }
 
     res = delta_writer->close();
+<<<<<<< HEAD
     ASSERT_EQ(OLAP_SUCCESS, res);
     res = delta_writer->close_wait(nullptr, false);
     ASSERT_EQ(OLAP_SUCCESS, res);
+=======
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->close_wait();
+    EXPECT_EQ(Status::OK(), res);
+
+    // publish version success
+    TabletSharedPtr tablet =
+            k_engine->tablet_manager()->get_tablet(write_req.tablet_id, write_req.schema_hash);
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    Version version;
+    version.first = tablet->rowset_with_max_version()->end_version() + 1;
+    version.second = tablet->rowset_with_max_version()->end_version() + 1;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+    for (auto& tablet_rs : tablet_related_rs) {
+        RowsetSharedPtr rowset = tablet_rs.second;
+        res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, write_req.schema_hash,
+                                                   tablet_rs.first.tablet_uid, version);
+        EXPECT_EQ(Status::OK(), res);
+        res = tablet->add_inc_rowset(rowset);
+        EXPECT_EQ(Status::OK(), res);
+    }
+    EXPECT_EQ(1, tablet->num_rows());
+
+    auto tablet_id = 10005;
+    auto schema_hash = 270068377;
+    res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+    EXPECT_EQ(Status::OK(), res);
+    delete delta_writer;
+}
+
+TEST_F(TestDeltaWriter, vec_sequence_col) {
+    TCreateTabletReq request;
+    sleep(20);
+    create_tablet_request_with_sequence_col(10005, 270068377, &request);
+    Status res = k_engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+                              30003, load_id,   tuple_desc,      &(tuple_desc->slots())};
+    DeltaWriter* delta_writer = nullptr;
+    DeltaWriter::open(&write_req, &delta_writer, true);
+    ASSERT_NE(delta_writer, nullptr);
+
+    MemTracker tracker;
+    MemPool pool(&tracker);
+
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                       slot_desc->get_data_type_ptr(),
+                                                       slot_desc->col_name()));
+    }
+
+    auto columns = block.mutate_columns();
+    {
+        int8_t c1 = 123;
+        columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+        int16_t c2 = 456;
+        columns[1]->insert_data((const char*)&c2, sizeof(c2));
+
+        int32_t c3 = 1;
+        columns[2]->insert_data((const char*)&c3, sizeof(c2));
+
+        DateTimeValue c4;
+        c4.from_date_str("2020-07-16 19:39:43", 19);
+        int64_t c4_int = c4.to_int64();
+        columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+
+        res = delta_writer->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+    }
+
+    res = delta_writer->close();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->close_wait();
+    ASSERT_TRUE(res.ok());
+>>>>>>> 75b3707a2 ([refactor](load) add tablet errors when close_wait return error (#9619))
 
     // publish version success
     TabletSharedPtr tablet =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org