You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/21 01:50:20 UTC

[doris] branch master updated: [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new efdc73777a [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
efdc73777a is described below

commit efdc73777a21275b68c52970b7871ad1582e74f5
Author: Xin Liao <li...@126.com>
AuthorDate: Wed Dec 21 09:50:13 2022 +0800

    [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
    
    It is very difficult to investigate the data inconsistency of multiple replicas.
    When loading data, the number of rows between replicas is checked to avoid some data inconsistency problems.
---
 be/src/exec/tablet_sink.cpp               | 41 +++++++++++++++++++++++++++++++
 be/src/exec/tablet_sink.h                 | 11 +++++++++
 be/src/olap/delta_writer.cpp              | 10 ++++++++
 be/src/olap/delta_writer.h                |  7 ++++++
 be/src/olap/memtable.cpp                  |  2 ++
 be/src/olap/memtable.h                    |  2 ++
 be/src/olap/rowset/beta_rowset_writer.cpp |  1 +
 be/src/vec/sink/vtablet_sink.cpp          |  4 +++
 gensrc/proto/internal_service.proto       |  2 ++
 9 files changed, 80 insertions(+)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 6e835228da..106623fa93 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -247,6 +247,10 @@ Status NodeChannel::open_wait() {
                         commit_info.tabletId = tablet.tablet_id();
                         commit_info.backendId = _node_id;
                         _tablet_commit_infos.emplace_back(std::move(commit_info));
+                        if (tablet.has_received_rows()) {
+                            _tablets_received_rows.emplace_back(tablet.tablet_id(),
+                                                                tablet.received_rows());
+                        }
                         VLOG_CRITICAL
                                 << "master replica commit info: tabletId=" << tablet.tablet_id()
                                 << ", backendId=" << _node_id
@@ -457,6 +461,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
                                             std::make_move_iterator(_tablet_commit_infos.end()));
 
         _index_channel->set_error_tablet_in_state(state);
+        _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
         return Status::OK();
     }
 
@@ -769,6 +774,39 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
     }
 }
 
+void IndexChannel::set_tablets_received_rows(
+        const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) {
+    for (const auto& [tablet_id, rows_num] : tablets_received_rows) {
+        _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num);
+    }
+}
+
+Status IndexChannel::check_tablet_received_rows_consistency() {
+    for (auto& tablet : _tablets_received_rows) {
+        for (size_t i = 0; i < tablet.second.size(); i++) {
+            VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id
+                        << ", txn_id: " << std::to_string(_parent->_txn_id)
+                        << ", tablet_id: " << tablet.first
+                        << ", node_id: " << tablet.second[i].first
+                        << ", rows_num: " << tablet.second[i].second;
+            if (i == 0) {
+                continue;
+            }
+            if (tablet.second[i].second != tablet.second[0].second) {
+                LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id
+                             << ", txn_id: " << std::to_string(_parent->_txn_id)
+                             << ", tablt_id: " << tablet.first
+                             << ", node_id: " << tablet.second[i].first
+                             << ", rows_num: " << tablet.second[i].second
+                             << ", node_id: " << tablet.second[0].first
+                             << ", rows_num: " << tablet.second[0].second;
+                return Status::InternalError("rows num written by multi replicas doest't match");
+            }
+        }
+    }
+    return Status::OK();
+}
+
 OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                              const std::vector<TExpr>& texprs, Status* status)
         : _pool(pool),
@@ -1155,6 +1193,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
                 Status index_st = index_channel->check_intolerable_failure();
                 if (!index_st.ok()) {
                     status = index_st;
+                } else if (Status st = index_channel->check_tablet_received_rows_consistency();
+                           !st.ok()) {
+                    status = st;
                 }
             } // end for index channels
         }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index a177db8a9b..2216c89fff 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -321,6 +321,8 @@ protected:
     bool _is_closed = false;
 
     RuntimeState* _state;
+    // rows number received per tablet, tablet_id -> rows_num
+    std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
 
 private:
     std::unique_ptr<RowBatch> _cur_batch;
@@ -368,6 +370,12 @@ public:
         return mem_consumption;
     }
 
+    void set_tablets_received_rows(
+            const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
+
+    // check whether the rows num written by different replicas is consistent
+    Status check_tablet_received_rows_consistency();
+
 private:
     friend class NodeChannel;
     friend class VNodeChannel;
@@ -398,6 +406,9 @@ private:
     Status _intolerable_failure_status = Status::OK();
 
     std::unique_ptr<MemTracker> _index_channel_tracker;
+    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
+    // used to verify whether the rows num received by different replicas is consistent
+    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
 };
 
 template <typename Row>
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 6b0cfb3e10..2a0fded006 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -167,6 +167,7 @@ Status DeltaWriter::write(Tuple* tuple) {
         return _cancel_status;
     }
 
+    _total_received_rows++;
     _mem_table->insert(tuple);
 
     // if memtable is full, push it to the flush executor,
@@ -192,6 +193,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
         return _cancel_status;
     }
 
+    _total_received_rows += row_idxs.size();
     for (const auto& row_idx : row_idxs) {
         _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
     }
@@ -220,6 +222,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         return _cancel_status;
     }
 
+    _total_received_rows += row_idxs.size();
     _mem_table->insert(block, row_idxs);
 
     if (_mem_table->need_to_agg()) {
@@ -237,6 +240,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
 }
 
 Status DeltaWriter::_flush_memtable_async() {
+    _merged_rows += _mem_table->merged_rows();
     return _flush_token->submit(std::move(_mem_table));
 }
 
@@ -358,6 +362,12 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
 
     _mem_table.reset();
 
+    if (_rowset_writer->num_rows() + _merged_rows != _total_received_rows) {
+        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
+                     << _rowset_writer->num_rows() << ", merged_rows: " << _merged_rows
+                     << ", total received rows: " << _total_received_rows;
+        return Status::InternalError("rows number written by delta writer dosen't match");
+    }
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
     if (_cur_rowset == nullptr) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 5484c7c7db..dafd77a8f8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -110,6 +110,8 @@ public:
 
     void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
+    int64_t total_received_rows() const { return _total_received_rows; }
+
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id,
                 bool is_vec);
@@ -172,6 +174,11 @@ private:
     RowsetIdUnorderedSet _rowset_ids;
     // current max version, used to calculate delete bitmap
     int64_t _cur_max_version;
+
+    // total rows num written by DeltaWriter
+    int64_t _total_received_rows = 0;
+    // rows num merged by memtable
+    int64_t _merged_rows = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 3881ce8dce..63575a3bb2 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -220,6 +220,7 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
 
     bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint);
     if (is_exist) {
+        _merged_rows++;
         _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key);
     } else {
         row_in_block->init_agg_places(
@@ -249,6 +250,7 @@ void MemTable::_insert_agg(const Tuple* tuple) {
 
     bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
     if (is_exist) {
+        _merged_rows++;
         (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
     } else {
         tuple_buf = _table_mem_pool->allocate(_schema_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index e7f59ff151..27606f3552 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -70,6 +70,7 @@ public:
     Status close();
 
     int64_t flush_size() const { return _flush_size; }
+    int64_t merged_rows() const { return _merged_rows; }
 
 private:
     Status _do_flush(int64_t& duration_ns);
@@ -202,6 +203,7 @@ private:
     // This is not the rows in this memtable, because rows may be merged
     // in unique or aggregate key model.
     int64_t _rows = 0;
+    int64_t _merged_rows = 0;
     void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
     void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
                                             TableKey row_in_skiplist) = nullptr;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8da5ca117a..e6ee1833b4 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -661,6 +661,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
         }
         ContiguousRow dst_row = it.get_current_row();
         auto s = writer->append_row(dst_row);
+        _raw_num_rows_written++;
         if (PREDICT_FALSE(!s.ok())) {
             LOG(WARNING) << "failed to append row: " << s.to_string();
             return Status::Error<WRITER_DATA_WRITE_ERROR>();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index de59e0b5a7..ae799e46d9 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -135,6 +135,10 @@ Status VNodeChannel::open_wait() {
                     commit_info.tabletId = tablet.tablet_id();
                     commit_info.backendId = _node_id;
                     _tablet_commit_infos.emplace_back(std::move(commit_info));
+                    if (tablet.has_received_rows()) {
+                        _tablets_received_rows.emplace_back(tablet.tablet_id(),
+                                                            tablet.received_rows());
+                    }
                     VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id()
                                   << ", backendId=" << _node_id
                                   << ", master node id: " << this->node_id()
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index ff05509e67..552786f313 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -65,6 +65,8 @@ message PTabletInfo {
     // Delta Writer will write data to local disk and then check if there are new raw values not in global dict
     // if appears, then it should add the column name to this vector
     repeated string invalid_dict_cols = 3; 
+    // total rows num received by DeltaWriter
+    optional int64 received_rows = 4;
 }
 
 // open a tablet writer


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org