You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/22 08:10:59 UTC

[doris] branch branch-1.2-lts updated (258db947af -> 7c1a4b80b0)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 258db947af [bug](jdbc) fix error of jdbc with datetime type in oracle (#15205)
     new 98f2824796 [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
     new e67d07ff44 [Bug](runtimefilter) Fix BE crash due to init failure (#15228)
     new 7c1a4b80b0 [fix](InBitmap) Check whether the in bitmap contains correlated subqueries (#15184)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/tablet_sink.cpp                        | 41 ++++++++++++++++++++++
 be/src/exec/tablet_sink.h                          | 11 ++++++
 be/src/olap/delta_writer.cpp                       | 10 ++++++
 be/src/olap/delta_writer.h                         |  7 ++++
 be/src/olap/memtable.cpp                           |  2 ++
 be/src/olap/memtable.h                             |  2 ++
 be/src/olap/rowset/beta_rowset_writer.cpp          |  1 +
 be/src/vec/exec/scan/vscan_node.cpp                |  8 ++---
 be/src/vec/exec/scan/vscan_node.h                  |  1 +
 be/src/vec/sink/vtablet_sink.cpp                   |  4 +++
 .../org/apache/doris/analysis/StmtRewriter.java    |  4 +++
 gensrc/proto/internal_service.proto                |  2 ++
 .../suites/query_p0/join/test_bitmap_filter.groovy | 10 ++++--
 13 files changed, 96 insertions(+), 7 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [fix](InBitmap) Check whether the in bitmap contains correlated subqueries (#15184)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7c1a4b80b0f10d77aadd1874121c2fe3372ed611
Author: luozenglin <37...@users.noreply.github.com>
AuthorDate: Wed Dec 21 16:52:27 2022 +0800

    [fix](InBitmap) Check whether the in bitmap contains correlated subqueries (#15184)
---
 .../src/main/java/org/apache/doris/analysis/StmtRewriter.java  |  4 ++++
 regression-test/suites/query_p0/join/test_bitmap_filter.groovy | 10 +++++++---
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index bb572b8aca..92be4011e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -1113,6 +1113,10 @@ public class StmtRewriter {
         Expr subquerySubstitute = slotRef;
         if (exprWithSubquery instanceof InPredicate) {
             if (slotRef.getType().isBitmapType()) {
+                if (isCorrelated) {
+                    throw new AnalysisException(
+                            "In bitmap does not support correlated subquery: " + exprWithSubquery.toSql());
+                }
                 Expr pred = new BitmapFilterPredicate(exprWithSubquery.getChild(0), slotRef,
                         ((InPredicate) exprWithSubquery).isNotIn());
                 pred.analyze(analyzer);
diff --git a/regression-test/suites/query_p0/join/test_bitmap_filter.groovy b/regression-test/suites/query_p0/join/test_bitmap_filter.groovy
index d6770f2ce9..0292c5c93a 100644
--- a/regression-test/suites/query_p0/join/test_bitmap_filter.groovy
+++ b/regression-test/suites/query_p0/join/test_bitmap_filter.groovy
@@ -16,13 +16,12 @@
 // under the License.
 
 suite("test_bitmap_filter", "query_p0") {
-    def tbl1 = "bigtable"
+    def tbl1 = "test_query_db.bigtable"
     def tbl2 = "bitmap_table"
-    def tbl3 = "baseall"
+    def tbl3 = "test_query_db.baseall"
 
     sql "set runtime_filter_type = 16"
     sql "set enable_vectorized_engine = true"
-    sql "use test_query_db"
     sql "DROP TABLE IF EXISTS ${tbl2}"
     sql """
     CREATE TABLE ${tbl2} (
@@ -51,4 +50,9 @@ suite("test_bitmap_filter", "query_p0") {
     qt_sql5 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) and k2 not in (select k3 from ${tbl2}) order by k1;"
 
     qt_sql6 "select k2, count(k2) from ${tbl1} where k1 in (select k2 from ${tbl2}) group by k2 order by k2;"
+
+    test {
+        sql "select k1, k2 from ${tbl1} b1 where k1 in (select k2 from ${tbl2} b2 where b1.k2 = b2.k1) order by k1;"
+        exception "In bitmap does not support correlated subquery"
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [Bug](runtimefilter) Fix BE crash due to init failure (#15228)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e67d07ff4440d8a9e9d5fb9a7682ef678afb4a65
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Dec 21 15:36:22 2022 +0800

    [Bug](runtimefilter) Fix BE crash due to init failure (#15228)
---
 be/src/vec/exec/scan/vscan_node.cpp | 8 ++++----
 be/src/vec/exec/scan/vscan_node.h   | 1 +
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp
index fb3ee2c525..cbd3a9e592 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -186,8 +186,8 @@ Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
 
 Status VScanNode::_register_runtime_filter() {
     int filter_size = _runtime_filter_descs.size();
-    _runtime_filter_ctxs.resize(filter_size);
-    _runtime_filter_ready_flag.resize(filter_size);
+    _runtime_filter_ctxs.reserve(filter_size);
+    _runtime_filter_ready_flag.reserve(filter_size);
     for (int i = 0; i < filter_size; ++i) {
         IRuntimeFilter* runtime_filter = nullptr;
         const auto& filter_desc = _runtime_filter_descs[i];
@@ -195,8 +195,8 @@ Status VScanNode::_register_runtime_filter() {
                 RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id()));
         RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
                                                                          &runtime_filter));
-        _runtime_filter_ctxs[i].runtime_filter = runtime_filter;
-        _runtime_filter_ready_flag[i] = false;
+        _runtime_filter_ctxs.emplace_back(runtime_filter);
+        _runtime_filter_ready_flag.emplace_back(false);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h
index c5b58b3d36..8abd4b5e19 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -172,6 +172,7 @@ protected:
     // For runtime filters
     struct RuntimeFilterContext {
         RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
+        RuntimeFilterContext(IRuntimeFilter* rf) : apply_mark(false), runtime_filter(rf) {}
         // set to true if this runtime filter is already applied to vconjunct_ctx_ptr
         bool apply_mark;
         IRuntimeFilter* runtime_filter;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 98f2824796ce25176a2379e52596a52238e1a6de
Author: Xin Liao <li...@126.com>
AuthorDate: Wed Dec 21 09:50:13 2022 +0800

    [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101)
    
    It is very difficult to investigate the data inconsistency of multiple replicas.
    When loading data, the number of rows between replicas is checked to avoid some data inconsistency problems.
---
 be/src/exec/tablet_sink.cpp               | 41 +++++++++++++++++++++++++++++++
 be/src/exec/tablet_sink.h                 | 11 +++++++++
 be/src/olap/delta_writer.cpp              | 10 ++++++++
 be/src/olap/delta_writer.h                |  7 ++++++
 be/src/olap/memtable.cpp                  |  2 ++
 be/src/olap/memtable.h                    |  2 ++
 be/src/olap/rowset/beta_rowset_writer.cpp |  1 +
 be/src/vec/sink/vtablet_sink.cpp          |  4 +++
 gensrc/proto/internal_service.proto       |  2 ++
 9 files changed, 80 insertions(+)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 7c0051c191..cbeeace20a 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -247,6 +247,10 @@ Status NodeChannel::open_wait() {
                         commit_info.tabletId = tablet.tablet_id();
                         commit_info.backendId = _node_id;
                         _tablet_commit_infos.emplace_back(std::move(commit_info));
+                        if (tablet.has_received_rows()) {
+                            _tablets_received_rows.emplace_back(tablet.tablet_id(),
+                                                                tablet.received_rows());
+                        }
                         VLOG_CRITICAL
                                 << "master replica commit info: tabletId=" << tablet.tablet_id()
                                 << ", backendId=" << _node_id
@@ -457,6 +461,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
                                             std::make_move_iterator(_tablet_commit_infos.end()));
 
         _index_channel->set_error_tablet_in_state(state);
+        _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
         return Status::OK();
     }
 
@@ -769,6 +774,39 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
     }
 }
 
+void IndexChannel::set_tablets_received_rows(
+        const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) {
+    for (const auto& [tablet_id, rows_num] : tablets_received_rows) {
+        _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num);
+    }
+}
+
+Status IndexChannel::check_tablet_received_rows_consistency() {
+    for (auto& tablet : _tablets_received_rows) {
+        for (size_t i = 0; i < tablet.second.size(); i++) {
+            VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id
+                        << ", txn_id: " << std::to_string(_parent->_txn_id)
+                        << ", tablet_id: " << tablet.first
+                        << ", node_id: " << tablet.second[i].first
+                        << ", rows_num: " << tablet.second[i].second;
+            if (i == 0) {
+                continue;
+            }
+            if (tablet.second[i].second != tablet.second[0].second) {
+                LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id
+                             << ", txn_id: " << std::to_string(_parent->_txn_id)
+                             << ", tablt_id: " << tablet.first
+                             << ", node_id: " << tablet.second[i].first
+                             << ", rows_num: " << tablet.second[i].second
+                             << ", node_id: " << tablet.second[0].first
+                             << ", rows_num: " << tablet.second[0].second;
+                return Status::InternalError("rows num written by multi replicas doest't match");
+            }
+        }
+    }
+    return Status::OK();
+}
+
 OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                              const std::vector<TExpr>& texprs, Status* status)
         : _pool(pool),
@@ -1155,6 +1193,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
                 Status index_st = index_channel->check_intolerable_failure();
                 if (!index_st.ok()) {
                     status = index_st;
+                } else if (Status st = index_channel->check_tablet_received_rows_consistency();
+                           !st.ok()) {
+                    status = st;
                 }
             } // end for index channels
         }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index fb820d6c42..e6973f7b7a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -321,6 +321,8 @@ protected:
     bool _is_closed = false;
 
     RuntimeState* _state;
+    // rows number received per tablet, tablet_id -> rows_num
+    std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows;
 
 private:
     std::unique_ptr<RowBatch> _cur_batch;
@@ -368,6 +370,12 @@ public:
         return mem_consumption;
     }
 
+    void set_tablets_received_rows(
+            const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id);
+
+    // check whether the rows num written by different replicas is consistent
+    Status check_tablet_received_rows_consistency();
+
 private:
     friend class NodeChannel;
     friend class VNodeChannel;
@@ -398,6 +406,9 @@ private:
     Status _intolerable_failure_status = Status::OK();
 
     std::unique_ptr<MemTracker> _index_channel_tracker;
+    // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num>
+    // used to verify whether the rows num received by different replicas is consistent
+    std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows;
 };
 
 template <typename Row>
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 75f402f202..09afa54be9 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -163,6 +163,7 @@ Status DeltaWriter::write(Tuple* tuple) {
         return _cancel_status;
     }
 
+    _total_received_rows++;
     _mem_table->insert(tuple);
 
     // if memtable is full, push it to the flush executor,
@@ -188,6 +189,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
         return _cancel_status;
     }
 
+    _total_received_rows += row_idxs.size();
     for (const auto& row_idx : row_idxs) {
         _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
     }
@@ -216,6 +218,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         return _cancel_status;
     }
 
+    _total_received_rows += row_idxs.size();
     _mem_table->insert(block, row_idxs);
 
     if (_mem_table->need_to_agg()) {
@@ -233,6 +236,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
 }
 
 Status DeltaWriter::_flush_memtable_async() {
+    _merged_rows += _mem_table->merged_rows();
     return _flush_token->submit(std::move(_mem_table));
 }
 
@@ -354,6 +358,12 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
 
     _mem_table.reset();
 
+    if (_rowset_writer->num_rows() + _merged_rows != _total_received_rows) {
+        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
+                     << _rowset_writer->num_rows() << ", merged_rows: " << _merged_rows
+                     << ", total received rows: " << _total_received_rows;
+        return Status::InternalError("rows number written by delta writer dosen't match");
+    }
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
     if (_cur_rowset == nullptr) {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 5484c7c7db..dafd77a8f8 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -110,6 +110,8 @@ public:
 
     void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
+    int64_t total_received_rows() const { return _total_received_rows; }
+
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id,
                 bool is_vec);
@@ -172,6 +174,11 @@ private:
     RowsetIdUnorderedSet _rowset_ids;
     // current max version, used to calculate delete bitmap
     int64_t _cur_max_version;
+
+    // total rows num written by DeltaWriter
+    int64_t _total_received_rows = 0;
+    // rows num merged by memtable
+    int64_t _merged_rows = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ab1a0f5209..46cc15275e 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -219,6 +219,7 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
 
     bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint);
     if (is_exist) {
+        _merged_rows++;
         _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key);
     } else {
         row_in_block->init_agg_places(
@@ -248,6 +249,7 @@ void MemTable::_insert_agg(const Tuple* tuple) {
 
     bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
     if (is_exist) {
+        _merged_rows++;
         (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
     } else {
         tuple_buf = _table_mem_pool->allocate(_schema_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index e7f59ff151..27606f3552 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -70,6 +70,7 @@ public:
     Status close();
 
     int64_t flush_size() const { return _flush_size; }
+    int64_t merged_rows() const { return _merged_rows; }
 
 private:
     Status _do_flush(int64_t& duration_ns);
@@ -202,6 +203,7 @@ private:
     // This is not the rows in this memtable, because rows may be merged
     // in unique or aggregate key model.
     int64_t _rows = 0;
+    int64_t _merged_rows = 0;
     void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
     void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
                                             TableKey row_in_skiplist) = nullptr;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 687e8bac35..d2277706a7 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -660,6 +660,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus
         }
         ContiguousRow dst_row = it.get_current_row();
         auto s = writer->append_row(dst_row);
+        _raw_num_rows_written++;
         if (PREDICT_FALSE(!s.ok())) {
             LOG(WARNING) << "failed to append row: " << s.to_string();
             return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index a9461e1659..7d01bf3105 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -135,6 +135,10 @@ Status VNodeChannel::open_wait() {
                     commit_info.tabletId = tablet.tablet_id();
                     commit_info.backendId = _node_id;
                     _tablet_commit_infos.emplace_back(std::move(commit_info));
+                    if (tablet.has_received_rows()) {
+                        _tablets_received_rows.emplace_back(tablet.tablet_id(),
+                                                            tablet.received_rows());
+                    }
                     VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id()
                                   << ", backendId=" << _node_id
                                   << ", master node id: " << this->node_id()
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index bf61b22707..0396e8944b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -65,6 +65,8 @@ message PTabletInfo {
     // Delta Writer will write data to local disk and then check if there are new raw values not in global dict
     // if appears, then it should add the column name to this vector
     repeated string invalid_dict_cols = 3; 
+    // total rows num received by DeltaWriter
+    optional int64 received_rows = 4;
 }
 
 // open a tablet writer


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org