You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/21 01:50:20 UTC
[doris] branch master updated: [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new efdc73777a [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
efdc73777a is described below
commit efdc73777a21275b68c52970b7871ad1582e74f5
Author: Xin Liao <li...@126.com>
AuthorDate: Wed Dec 21 09:50:13 2022 +0800
[enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
It is very difficult to investigate the data inconsistency of multiple replicas.
When loading data, the number of rows between replicas is checked to avoid some data inconsistency problems.
---
be/src/exec/tablet_sink.cpp | 41 +++++++++++++++++++++++++++++++
be/src/exec/tablet_sink.h | 11 +++++++++
be/src/olap/delta_writer.cpp | 10 ++++++++
be/src/olap/delta_writer.h | 7 ++++++
be/src/olap/memtable.cpp | 2 ++
be/src/olap/memtable.h | 2 ++
be/src/olap/rowset/beta_rowset_writer.cpp | 1 +
be/src/vec/sink/vtablet_sink.cpp | 4 +++
gensrc/proto/internal_service.proto | 2 ++
9 files changed, 80 insertions(+)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 6e835228da..106623fa93 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -247,6 +247,10 @@ Status NodeChannel::open_wait() {
commit_info.tabletId = tablet.tablet_id();
commit_info.backendId = _node_id;
_tablet_commit_infos.emplace_back(std::move(commit_info));
+ if (tablet.has_received_rows()) {
+ _tablets_received_rows.emplace_back(tablet.tablet_id(),
+ tablet.received_rows());
+ }
VLOG_CRITICAL
<< "master replica commit info: tabletId=" << tablet.tablet_id()
<< ", backendId=" << _node_id
@@ -457,6 +461,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
std::make_move_iterator(_tablet_commit_infos.end()));
_index_channel->set_error_tablet_in_state(state);
+ _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
return Status::OK();
}
@@ -769,6 +774,39 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
}
}
+void IndexChannel::set_tablets_received_rows(
+ const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) {
+ for (const auto& [tablet_id, rows_num] : tablets_received_rows) {
+ _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num);
+ }
+}
+
+Status IndexChannel::check_tablet_received_rows_consistency() {
+ for (auto& tablet : _tablets_received_rows) {
+ for (size_t i = 0; i < tablet.second.size(); i++) {
+ VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id
+ << ", txn_id: " << std::to_string(_parent->_txn_id)
+ << ", tablet_id: " << tablet.first
+ << ", node_id: " << tablet.second[i].first
+ << ", rows_num: " << tablet.second[i].second;
+ if (i == 0) {
+ continue;
+ }
+ if (tablet.second[i].second != tablet.second[0].second) {
+ LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id
+ << ", txn_id: " << std::to_string(_parent->_txn_id)
+ << ", tablt_id: " << tablet.first
+ << ", node_id: " << tablet.second[i].first
+ << ", rows_num: " << tablet.second[i].second
+ << ", node_id: " << tablet.second[0].first
+ << ", rows_num: " << tablet.second[0].second;
+ return Status::InternalError("rows num written by multi replicas doest't match");
+ }
+ }
+ }
+ return Status::OK();
+}
+
OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
: _pool(pool),
@@ -1155,6 +1193,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
Status index_st = index_channel->check_intolerable_failure();
if (!index_st.ok()) {
status = index_st;
+ } else if (Status st = index_channel->check_tablet_received_rows_consistency();
+ !st.ok()) {
+ status = st;
}
} // end for index channels
}
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index a177db8a9b..2216c89fff 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -321,6 +321,8 @@ protected:
bool _is_closed = false;
RuntimeState* _state;
+ // rows number received per tablet, tablet_id -> rows_num
+ std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
private:
std::unique_ptr<RowBatch> _cur_batch;
@@ -368,6 +370,12 @@ public:
return mem_consumption;
}
+ void set_tablets_received_rows(
+ const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
+
+ // check whether the rows num written by different replicas is consistent
+ Status check_tablet_received_rows_consistency();
+
private:
friend class NodeChannel;
friend class VNodeChannel;
@@ -398,6 +406,9 @@ private:
Status _intolerable_failure_status = Status::OK();
std::unique_ptr<MemTracker> _index_channel_tracker;
+ // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
+ // used to verify whether the rows num received by different replicas is consistent
+ std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
};
template <typename Row>
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 6b0cfb3e10..2a0fded006 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -167,6 +167,7 @@ Status DeltaWriter::write(Tuple* tuple) {
return _cancel_status;
}
+ _total_received_rows++;
_mem_table->insert(tuple);
// if memtable is full, push it to the flush executor,
@@ -192,6 +193,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
return _cancel_status;
}
+ _total_received_rows += row_idxs.size();
for (const auto& row_idx : row_idxs) {
_mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
}
@@ -220,6 +222,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
return _cancel_status;
}
+ _total_received_rows += row_idxs.size();
_mem_table->insert(block, row_idxs);
if (_mem_table->need_to_agg()) {
@@ -237,6 +240,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
Status DeltaWriter::_flush_memtable_async() {
+ _merged_rows += _mem_table->merged_rows();
return _flush_token->submit(std::move(_mem_table));
}
@@ -358,6 +362,12 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
_mem_table.reset();
+ if (_rowset_writer->num_rows() + _merged_rows != _total_received_rows) {
+ LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
+ << _rowset_writer->num_rows() << ", merged_rows: " << _merged_rows
+ << ", total received rows: " << _total_received_rows;
+ return Status::InternalError("rows number written by delta writer dosen't match");
+ }
// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
if (_cur_rowset == nullptr) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 5484c7c7db..dafd77a8f8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -110,6 +110,8 @@ public:
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+ int64_t total_received_rows() const { return _total_received_rows; }
+
private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id,
bool is_vec);
@@ -172,6 +174,11 @@ private:
RowsetIdUnorderedSet _rowset_ids;
// current max version, used to calculate delete bitmap
int64_t _cur_max_version;
+
+ // total rows num written by DeltaWriter
+ int64_t _total_received_rows = 0;
+ // rows num merged by memtable
+ int64_t _merged_rows = 0;
};
} // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 3881ce8dce..63575a3bb2 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -220,6 +220,7 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint);
if (is_exist) {
+ _merged_rows++;
_aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key);
} else {
row_in_block->init_agg_places(
@@ -249,6 +250,7 @@ void MemTable::_insert_agg(const Tuple* tuple) {
bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
if (is_exist) {
+ _merged_rows++;
(this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
} else {
tuple_buf = _table_mem_pool->allocate(_schema_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index e7f59ff151..27606f3552 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -70,6 +70,7 @@ public:
Status close();
int64_t flush_size() const { return _flush_size; }
+ int64_t merged_rows() const { return _merged_rows; }
private:
Status _do_flush(int64_t& duration_ns);
@@ -202,6 +203,7 @@ private:
// This is not the rows in this memtable, because rows may be merged
// in unique or aggregate key model.
int64_t _rows = 0;
+ int64_t _merged_rows = 0;
void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
TableKey row_in_skiplist) = nullptr;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8da5ca117a..e6ee1833b4 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -661,6 +661,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
}
ContiguousRow dst_row = it.get_current_row();
auto s = writer->append_row(dst_row);
+ _raw_num_rows_written++;
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << "failed to append row: " << s.to_string();
return Status::Error<WRITER_DATA_WRITE_ERROR>();
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index de59e0b5a7..ae799e46d9 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -135,6 +135,10 @@ Status VNodeChannel::open_wait() {
commit_info.tabletId = tablet.tablet_id();
commit_info.backendId = _node_id;
_tablet_commit_infos.emplace_back(std::move(commit_info));
+ if (tablet.has_received_rows()) {
+ _tablets_received_rows.emplace_back(tablet.tablet_id(),
+ tablet.received_rows());
+ }
VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id()
<< ", backendId=" << _node_id
<< ", master node id: " << this->node_id()
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index ff05509e67..552786f313 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -65,6 +65,8 @@ message PTabletInfo {
// Delta Writer will write data to local disk and then check if there are new raw values not in global dict
// if appears, then it should add the column name to this vector
repeated string invalid_dict_cols = 3;
+ // total rows num received by DeltaWriter
+ optional int64 received_rows = 4;
}
// open a tablet writer
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org