You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/22 03:46:03 UTC
[incubator-doris] branch master updated: [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31e40191a8 [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)
31e40191a8 is described below
commit 31e40191a8ba7e5295258e9be61e91c8ffb2fa33
Author: xiepengcheng01 <10...@users.noreply.github.com>
AuthorDate: Sun May 22 11:45:57 2022 +0800
[Refactor] add vpre_filter_expr for vectorized to improve performance (#9508)
---
be/src/exec/base_scanner.cpp | 28 +++---
be/src/exec/base_scanner.h | 4 +-
be/test/vec/exec/vbroker_scanner_test.cpp | 103 ++++++++++++++++++++-
.../org/apache/doris/planner/LoadScanNode.java | 9 +-
.../java/org/apache/doris/planner/PlanNode.java | 7 ++
gensrc/thrift/PlanNodes.thrift | 2 +-
6 files changed, 129 insertions(+), 24 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index e06b52de2f..005e64c703 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -130,11 +130,13 @@ Status BaseScanner::init_expr_ctxes() {
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
if (_state->enable_vectorized_exec()) {
- RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
- _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
- _mem_tracker));
- RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
+ // for vectorized, preceding filter exprs should be compounded to one passed from fe.
+ DCHECK(_pre_filter_texprs.size() == 1);
+ _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
+ _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
+ RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker));
+ RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
} else {
RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
&_pre_filter_ctxs));
@@ -302,14 +304,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
- if (!_vpre_filter_ctxs.empty()) {
- for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
- auto old_rows = _src_block.rows();
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
- origin_column_num));
- _counter->num_rows_unselected += old_rows - _src_block.rows();
- }
- }
+ auto old_rows = _src_block.rows();
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block,
+ origin_column_num));
+ _counter->num_rows_unselected += old_rows - _src_block.rows();
return Status::OK();
}
@@ -453,8 +451,8 @@ void BaseScanner::close() {
Expr::close(_pre_filter_ctxs, _state);
}
- if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
- vectorized::VExpr::close(_vpre_filter_ctxs, _state);
+ if (_vpre_filter_ctx_ptr) {
+ (*_vpre_filter_ctx_ptr)->close(_state);
}
}
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 1c2ce211b5..fe3e088d4e 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -62,7 +62,7 @@ public:
if (_state->enable_vectorized_exec()) {
vectorized::VExpr::close(_dest_vexpr_ctx, _state);
}
- };
+ }
virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
@@ -138,7 +138,7 @@ protected:
// for vectorized load
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
- std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
+ std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
vectorized::Block _src_block;
int _num_of_columns_from_file;
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 713aefc4a7..428d82343c 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -363,7 +363,6 @@ TEST_F(VBrokerScannerTest, normal) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
-
VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
@@ -376,7 +375,6 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_TRUE(eof);
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);
-
ASSERT_EQ(columns[0]->get_int(0), 1);
ASSERT_EQ(columns[0]->get_int(1), 4);
ASSERT_EQ(columns[0]->get_int(2), 8);
@@ -390,6 +388,105 @@ TEST_F(VBrokerScannerTest, normal) {
ASSERT_EQ(columns[2]->get_int(2), 10);
}
+TEST_F(VBrokerScannerTest, normal_with_pre_filter) {
+ std::vector<TBrokerRangeDesc> ranges;
+ TBrokerRangeDesc range;
+ range.path = "./be/test/exec/test_data/broker_scanner/normal.csv";
+ range.start_offset = 0;
+ range.size = -1;
+ range.splittable = true;
+ range.file_type = TFileType::FILE_LOCAL;
+ range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+ ranges.push_back(range);
+
+ // init pre_filter expr: k1 < '8'
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::INT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+ TTypeDesc varchar_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::VARCHAR);
+ scalar_type.__set_len(5000);
+ node.__set_scalar_type(scalar_type);
+ varchar_type.types.push_back(node);
+ }
+
+ TExpr filter_expr;
+ {
+ TExprNode expr_node;
+ expr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN);
+ expr_node.__set_num_children(2);
+ expr_node.__isset.opcode = true;
+ expr_node.__set_opcode(TExprOpcode::LT);
+ expr_node.__isset.vector_opcode = true;
+ expr_node.__set_vector_opcode(TExprOpcode::LT);
+ expr_node.__isset.fn = true;
+ expr_node.fn.name.function_name = "lt";
+ expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN;
+ expr_node.fn.ret_type = int_type;
+ expr_node.fn.has_var_args = false;
+ filter_expr.nodes.push_back(expr_node);
+ }
+ {
+ TExprNode expr_node;
+ expr_node.__set_node_type(TExprNodeType::SLOT_REF);
+ expr_node.type = varchar_type;
+ expr_node.__set_num_children(0);
+ expr_node.__isset.slot_ref = true;
+ TSlotRef slot_ref;
+ slot_ref.__set_slot_id(4);
+ slot_ref.__set_tuple_id(1);
+ expr_node.__set_slot_ref(slot_ref);
+ expr_node.__isset.output_column = true;
+ expr_node.__set_output_column(0);
+ filter_expr.nodes.push_back(expr_node);
+ }
+ {
+ TExprNode expr_node;
+ expr_node.__set_node_type(TExprNodeType::STRING_LITERAL);
+ expr_node.type = varchar_type;
+ expr_node.__set_num_children(0);
+ expr_node.__isset.string_literal = true;
+ TStringLiteral string_literal;
+ string_literal.__set_value("8");
+ expr_node.__set_string_literal(string_literal);
+ filter_expr.nodes.push_back(expr_node);
+ }
+ _pre_filter.push_back(filter_expr);
+ VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
+ &_counter);
+ auto st = scanner.open();
+ ASSERT_TRUE(st.ok());
+
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
+ bool eof = false;
+ // end of file
+ st = scanner.get_next(block.get(), &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eof);
+ auto columns = block->get_columns();
+ ASSERT_EQ(columns.size(), 3);
+
+ ASSERT_EQ(columns[0]->get_int(0), 1);
+ ASSERT_EQ(columns[0]->get_int(1), 4);
+
+ ASSERT_EQ(columns[1]->get_int(0), 2);
+ ASSERT_EQ(columns[1]->get_int(1), 5);
+
+ ASSERT_EQ(columns[2]->get_int(0), 3);
+ ASSERT_EQ(columns[2]->get_int(1), 6);
+}
+
TEST_F(VBrokerScannerTest, normal2) {
std::vector<TBrokerRangeDesc> ranges;
@@ -406,7 +503,6 @@ TEST_F(VBrokerScannerTest, normal2) {
range.start_offset = 0;
range.size = 4;
ranges.push_back(range);
-
VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
@@ -440,7 +536,6 @@ TEST_F(VBrokerScannerTest, normal5) {
range.file_type = TFileType::FILE_LOCAL;
range.format_type = TFileFormatType::FORMAT_CSV_PLAIN;
ranges.push_back(range);
-
VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter,
&_counter);
auto st = scanner.open();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 9ca69c819c..9f063a47ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.rewrite.ExprRewriter;
@@ -213,8 +214,12 @@ public abstract class LoadScanNode extends ScanNode {
planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE);
TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt());
if (!preFilterConjuncts.isEmpty()) {
- for (Expr e : preFilterConjuncts) {
- brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+ if (Config.enable_vectorized_load && vpreFilterConjunct != null) {
+ brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift());
+ } else {
+ for (Expr e : preFilterConjuncts) {
+ brokerScanNode.addToPreFilterExprs(e.treeToThrift());
+ }
}
}
planNode.setBrokerScanNode(brokerScanNode);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index d43747ed11..f1bef30896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// 4. Filter data by using "conjuncts".
protected List<Expr> preFilterConjuncts = Lists.newArrayList();
+ protected Expr vpreFilterConjunct = null;
+
// Fragment that this PlanNode is executed in. Valid only after this PlanNode has been
// assigned to a fragment. Set and maintained by enclosing PlanFragment.
protected PlanFragment fragment;
@@ -904,6 +906,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
initCompoundPredicate(vconjunct);
}
+ if (!preFilterConjuncts.isEmpty()) {
+ vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts);
+ initCompoundPredicate(vpreFilterConjunct);
+ }
+
for (PlanNode child : children) {
child.convertToVectoriezd();
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 94168a9074..d4d37e11ac 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -252,7 +252,7 @@ struct TBrokerScanNode {
// Partition info used to process partition select in broker load
2: optional list<Exprs.TExpr> partition_exprs
3: optional list<Partitions.TRangePartition> partition_infos
- 4: optional list<Exprs.TExpr> pre_filter_exprs
+ 4: optional list<Exprs.TExpr> pre_filter_exprs
}
struct TEsScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org