You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/22 03:46:03 UTC

[incubator-doris] branch master updated: [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 31e40191a8 [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)
31e40191a8 is described below

commit 31e40191a8ba7e5295258e9be61e91c8ffb2fa33
Author: xiepengcheng01 <10...@users.noreply.github.com>
AuthorDate: Sun May 22 11:45:57 2022 +0800

    [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)
---
 be/src/exec/base_scanner.cpp                       |  28 +++---
 be/src/exec/base_scanner.h                         |   4 +-
 be/test/vec/exec/vbroker_scanner_test.cpp          | 103 ++++++++++++++++++++-
 .../org/apache/doris/planner/LoadScanNode.java     |   9 +-
 .../java/org/apache/doris/planner/PlanNode.java    |   7 ++
 gensrc/thrift/PlanNodes.thrift                     |   2 +-
 6 files changed, 129 insertions(+), 24 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index e06b52de2f..005e64c703 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -130,11 +130,13 @@ Status BaseScanner::init_expr_ctxes() {
     // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
     if (!_pre_filter_texprs.empty()) {
         if (_state->enable_vectorized_exec()) {
-            RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
-                    _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
-            RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
-                                                       _mem_tracker));
-            RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
+            // for vectorized, preceding filter exprs should be compounded to one passed from fe.
+            DCHECK(_pre_filter_texprs.size() == 1);
+            _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+            RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
+                    _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
+            RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker));
+            RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
         } else {
             RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
                                                     &_pre_filter_ctxs));
@@ -302,14 +304,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
 Status BaseScanner::_filter_src_block() {
     auto origin_column_num = _src_block.columns();
     // filter block
-    if (!_vpre_filter_ctxs.empty()) {
-        for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
-            auto old_rows = _src_block.rows();
-            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
-                                                                   origin_column_num));
-            _counter->num_rows_unselected += old_rows - _src_block.rows();
-        }
-    }
+    auto old_rows = _src_block.rows();
+    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block,
+                                                           origin_column_num));
+    _counter->num_rows_unselected += old_rows - _src_block.rows();
     return Status::OK();
 }
 
@@ -453,8 +451,8 @@ void BaseScanner::close() {
         Expr::close(_pre_filter_ctxs, _state);
     }
 
-    if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
-        vectorized::VExpr::close(_vpre_filter_ctxs, _state);
+    if (_vpre_filter_ctx_ptr) {
+        (*_vpre_filter_ctx_ptr)->close(_state);
     }
 }
 
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 1c2ce211b5..fe3e088d4e 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -62,7 +62,7 @@ public:
         if (_state->enable_vectorized_exec()) {
             vectorized::VExpr::close(_dest_vexpr_ctx, _state);
         }
-    };
+    }
 
     virtual Status init_expr_ctxes();
     // Open this scanner, will initialize information need to
@@ -138,7 +138,7 @@ protected:
 
     // for vectorized load
     std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
-    std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
+    std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
     vectorized::Block _src_block;
     int _num_of_columns_from_file;
 
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 713aefc4a7..428d82343c 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -363,7 +363,6 @@ TEST_F(VBrokerScannerTest, normal) {
     range.file_type = TFileType::FILE_LOCAL;
     range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
     ranges.push_back(range);
-
     VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
                            &_counter);
     auto st = scanner.open();
@@ -376,7 +375,6 @@ TEST_F(VBrokerScannerTest, normal) {
     ASSERT_TRUE(eof);
     auto columns = block->get_columns();
     ASSERT_EQ(columns.size(), 3);
-
     ASSERT_EQ(columns[0]->get_int(0), 1);
     ASSERT_EQ(columns[0]->get_int(1), 4);
     ASSERT_EQ(columns[0]->get_int(2), 8);
@@ -390,6 +388,105 @@ TEST_F(VBrokerScannerTest, normal) {
     ASSERT_EQ(columns[2]->get_int(2), 10);
 }
 
+TEST_F(VBrokerScannerTest, normal_with_pre_filter) {
+    std::vector<TBrokerRangeDesc> ranges;
+    TBrokerRangeDesc range;
+    range.path = "./be/test/exec/test_data/broker_scanner/normal.csv";
+    range.start_offset = 0;
+    range.size = -1;
+    range.splittable = true;
+    range.file_type = TFileType::FILE_LOCAL;
+    range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+    ranges.push_back(range);
+
+    // init pre_filter expr: k1 < '8'
+    TTypeDesc int_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::INT);
+        node.__set_scalar_type(scalar_type);
+        int_type.types.push_back(node);
+    }
+    TTypeDesc varchar_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::VARCHAR);
+        scalar_type.__set_len(5000);
+        node.__set_scalar_type(scalar_type);
+        varchar_type.types.push_back(node);
+    }
+
+    TExpr filter_expr;
+    {
+        TExprNode expr_node;
+        expr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+        expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN);
+        expr_node.__set_num_children(2);
+        expr_node.__isset.opcode = true;
+        expr_node.__set_opcode(TExprOpcode::LT);
+        expr_node.__isset.vector_opcode = true;
+        expr_node.__set_vector_opcode(TExprOpcode::LT);
+        expr_node.__isset.fn = true;
+        expr_node.fn.name.function_name = "lt";
+        expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN;
+        expr_node.fn.ret_type = int_type;
+        expr_node.fn.has_var_args = false;
+        filter_expr.nodes.push_back(expr_node);
+    }
+    {
+        TExprNode expr_node;
+        expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+        expr_node.type = varchar_type;
+        expr_node.__set_num_children(0);
+        expr_node.__isset.slot_ref = true;
+        TSlotRef slot_ref;
+        slot_ref.__set_slot_id(4);
+        slot_ref.__set_tuple_id(1);
+        expr_node.__set_slot_ref(slot_ref);
+        expr_node.__isset.output_column = true;
+        expr_node.__set_output_column(0);
+        filter_expr.nodes.push_back(expr_node);
+    }
+    {
+        TExprNode expr_node;
+        expr_node.__set_node_type(TExprNodeType::STRING_LITERAL);
+        expr_node.type = varchar_type;
+        expr_node.__set_num_children(0);
+        expr_node.__isset.string_literal = true;
+        TStringLiteral string_literal;
+        string_literal.__set_value("8");
+        expr_node.__set_string_literal(string_literal);
+        filter_expr.nodes.push_back(expr_node);
+    }
+    _pre_filter.push_back(filter_expr);
+    VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
+                           &_counter);
+    auto st = scanner.open();
+    ASSERT_TRUE(st.ok());
+
+    std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+    bool eof = false;
+    // end of file
+    st = scanner.get_next(block.get(), &eof);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eof);
+    auto columns = block->get_columns();
+    ASSERT_EQ(columns.size(), 3);
+
+    ASSERT_EQ(columns[0]->get_int(0), 1);
+    ASSERT_EQ(columns[0]->get_int(1), 4);
+
+    ASSERT_EQ(columns[1]->get_int(0), 2);
+    ASSERT_EQ(columns[1]->get_int(1), 5);
+
+    ASSERT_EQ(columns[2]->get_int(0), 3);
+    ASSERT_EQ(columns[2]->get_int(1), 6);
+}
+
 TEST_F(VBrokerScannerTest, normal2) {
     std::vector<TBrokerRangeDesc> ranges;
 
@@ -406,7 +503,6 @@ TEST_F(VBrokerScannerTest, normal2) {
     range.start_offset = 0;
     range.size = 4;
     ranges.push_back(range);
-
     VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
                            &_counter);
     auto st = scanner.open();
@@ -440,7 +536,6 @@ TEST_F(VBrokerScannerTest, normal5) {
     range.file_type = TFileType::FILE_LOCAL;
     range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
     ranges.push_back(range);
-
     VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
                            &_counter);
     auto st = scanner.open();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 9ca69c819c..9f063a47ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -213,8 +214,12 @@ public abstract class LoadScanNode extends ScanNode {
         planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
         TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
         if (!preFilterConjuncts.isEmpty()) {
-            for (Expr e : preFilterConjuncts) {
-                brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+            if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
+                brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
+            } else {
+                for (Expr e : preFilterConjuncts) {
+                    brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+                }
             }
         }
         planNode.setBrokerScanNode(brokerScanNode);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index d43747ed11..f1bef30896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     //  4. Filter data by using "conjuncts".
     protected List<Expr> preFilterConjuncts = Lists.newArrayList();
 
+    protected Expr vpreFilterConjunct = null;
+
     // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been
     // assigned to a fragment. Set and maintained by enclosing PlanFragment.
     protected PlanFragment fragment;
@@ -904,6 +906,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
             initCompoundPredicate(vconjunct);
         }
 
+        if (!preFilterConjuncts.isEmpty()) {
+            vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
+            initCompoundPredicate(vpreFilterConjunct);
+        }
+
         for (PlanNode child : children) {
             child.convertToVectoriezd();
         }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 94168a9074..d4d37e11ac 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -252,7 +252,7 @@ struct TBrokerScanNode {
     // Partition info used to process partition select in broker load
     2: optional list<Exprs.TExpr> partition_exprs
     3: optional list<Partitions.TRangePartition> partition_infos
-	4: optional list<Exprs.TExpr> pre_filter_exprs
+    4: optional list<Exprs.TExpr> pre_filter_exprs
 }
 
 struct TEsScanNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org