You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/13 12:46:26 UTC

[GitHub] [incubator-doris] Gabriel39 opened a new pull request, #10103: [feature] support runtime filter on vectorized engine

Gabriel39 opened a new pull request, #10103:
URL: https://github.com/apache/incubator-doris/pull/10103

   # Proposed changes
   
   Issue Number: close #10096
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r899745912


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =

Review Comment:
   here need to dispose the nullable column



##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);

Review Comment:
   RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896930882


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =
+                ((ColumnVector<UInt8>*)block->get_columns_with_type_and_name()[*result_column_id]
+                         .column.get())
+                        ->get_data()
+                        .data();
+        _filtered_rows +=
+                doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), block->rows());
+
+        if ((!_has_calculate_filter) && (_scan_rows > 0) && (_scan_rows.load() >= _loop_size)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < _expect_filter_rate) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;
+        }
+        return status;
+    }
+}
+
+const std::string& VRuntimeFilterWrapper::expr_name() const {
+    return _impl->expr_name();

Review Comment:
   Maybe we can add something to show it is a `Wrapper`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897468507


##########
be/src/vec/exprs/vexpr.cpp:
##########
@@ -88,8 +98,8 @@ void VExpr::close(doris::RuntimeState* state, VExprContext* context,
     }
 }
 
-Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr_node,
-                          VExpr** expr) {
+Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr_node, VExpr** expr,

Review Comment:
   Use a new method `create_rf_expr` instead



##########
be/src/exec/olap_scan_node.cpp:
##########
@@ -91,6 +92,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
                                                                         &runtime_filter));
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+        _rf_locks.push_back(new std::mutex());

Review Comment:
   I change it to unique pointer and I think this way is better



##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =
+                ((ColumnVector<UInt8>*)block->get_columns_with_type_and_name()[*result_column_id]
+                         .column.get())
+                        ->get_data()
+                        .data();
+        _filtered_rows +=
+                doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), block->rows());
+
+        if ((!_has_calculate_filter) && (_scan_rows > 0) && (_scan_rows.load() >= _loop_size)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < _expect_filter_rate) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;

Review Comment:
   > why we need `_has_calculate_filter`? seems `_scan_rows > 0` is e
   
   I just want to use the origin logics in `exprs/bloomfilter_predicate.cpp` here. I think using `_has_calculate_filter` will ensure we only calculate filter rate once when scan rows count is large than `_loop_size`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/doris/pull/10103#discussion_r900115251


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
+        size_t num_columns_without_result = block->columns();
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));
+        uint8_t* data = nullptr;
+        const ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id);
+        if (auto* nullable = check_and_get_column<ColumnNullable>(*result_column.column)) {
+            data = ((ColumnVector<UInt8>*)nullable->get_nested_column_ptr().get())

Review Comment:
   should also dispose the nullable value.



##########
be/src/vec/exec/volap_scan_node.cpp:
##########
@@ -389,29 +393,73 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         scanner->set_opened();
     }
 
-    std::vector<ExprContext*> contexts;
+    std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
     for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
         if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+            /// When runtime filters are ready during running, we should use them to filter data
+            /// in VOlapScanner.
+            /// New arrival rf will be processed as below:
+            /// 1. convert these runtime filters to vectorized expressions
+            /// 2. if this is the first scanner thread to receive this rf, construct a new
+            /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
+            /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
+            /// once after any runtime_filters are ready.
+            /// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
             IRuntimeFilter* runtime_filter = nullptr;
             state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
                                                             &runtime_filter);
             DCHECK(runtime_filter != nullptr);
             bool ready = runtime_filter->is_ready();
             if (ready) {
-                runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+                runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
                 scanner_filter_apply_marks[i] = true;

Review Comment:
   it's `std::vector<bool>`, i am not sure it can be valid in mutil thread env.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896401962


##########
be/src/vec/exprs/vbloom_predicate.cpp:
##########
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vbloom_predicate.h"
+
+#include <string_view>
+
+namespace doris::vectorized {
+
+VBloomPredicate::VBloomPredicate(const TExprNode& node)
+        : VExpr(node), _filter(nullptr), _expr_name("bloom_predicate") {}
+
+Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                VExprContext* context) {
+    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+
+    if (_is_prepare) {
+        return Status::OK();
+    }
+    if (_children.size() != 1) {
+        return Status::InternalError("Invalid argument for VBloomPredicate.");
+    }
+
+    _is_prepare = true;
+
+    ColumnsWithTypeAndName argument_template;
+    argument_template.reserve(_children.size());
+    for (auto child : _children) {
+        auto column = child->data_type()->create_column();
+        argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name());
+    }
+    return Status::OK();
+}
+
+Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
+                             FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    return Status::OK();
+}
+
+void VBloomPredicate::close(RuntimeState* state, VExprContext* context,
+                            FunctionContext::FunctionStateScope scope) {
+    VExpr::close(state, context, scope);
+}
+
+Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) {
+    doris::vectorized::ColumnNumbers arguments(_children.size());
+    for (int i = 0; i < _children.size(); ++i) {
+        int column_id = -1;
+        _children[i]->execute(context, block, &column_id);
+        arguments[i] = column_id;
+    }
+    // call function
+    size_t num_columns_without_result = block->columns();
+    auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+
+    ColumnPtr argument_column =
+            block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+    size_t sz = argument_column->size();
+    res_data_column->resize(sz);
+    res_data_column->reserve(sz);

Review Comment:
   useless `reserve`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896677175


##########
be/src/exec/olap_scan_node.cpp:
##########
@@ -91,6 +92,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
                                                                         &runtime_filter));
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+        _rf_locks.push_back(new std::mutex());

Review Comment:
   better to use smart pointer, and why not just store std::vector<std::mutex> ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/doris/pull/10103#discussion_r900112175


##########
be/src/vec/exec/volap_scan_node.cpp:
##########
@@ -389,29 +393,73 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         scanner->set_opened();
     }
 
-    std::vector<ExprContext*> contexts;
+    std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
     for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
         if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+            /// When runtime filters are ready during running, we should use them to filter data
+            /// in VOlapScanner.
+            /// New arrival rf will be processed as below:
+            /// 1. convert these runtime filters to vectorized expressions
+            /// 2. if this is the first scanner thread to receive this rf, construct a new
+            /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
+            /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
+            /// once after any runtime_filters are ready.
+            /// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
             IRuntimeFilter* runtime_filter = nullptr;
             state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
                                                             &runtime_filter);
             DCHECK(runtime_filter != nullptr);
             bool ready = runtime_filter->is_ready();
             if (ready) {
-                runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+                runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
                 scanner_filter_apply_marks[i] = true;

Review Comment:
   it's `std::vector<bool>`, i am not sure it can be valid in mutil thread env.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
morningman commented on PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#issuecomment-1153891268

   > DO NOT MERGE now. I will continue to test this PR. Feel free to review this PR as I think this is almost done.
   
   You can convert this PR to draft if it is not ready for review. See `convert to draft` at the side bar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896677175


##########
be/src/exec/olap_scan_node.cpp:
##########
@@ -91,6 +92,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
                                                                         &runtime_filter));
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+        _rf_locks.push_back(new std::mutex());

Review Comment:
   better to use smart pointer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897700779


##########
be/src/vec/exprs/vbloom_predicate.h:
##########
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bloomfilter_predicate.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VBloomPredicate final : public VExpr {
+public:
+    VBloomPredicate(const TExprNode& node);
+    ~VBloomPredicate() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                                  int* result_column_id) override;
+    virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                                  VExprContext* context) override;
+    virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                               FunctionContext::FunctionStateScope scope) override;
+    virtual void close(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    virtual VExpr* clone(doris::ObjectPool* pool) const override {
+        auto cloned = pool->add(new VBloomPredicate(*this));
+        cloned->set_filter(_filter.get());

Review Comment:
   have will cause double delete and core dump ? rethink the logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897497731


##########
be/src/util/simd/bits.h:
##########
@@ -57,5 +57,35 @@ inline uint32_t bytes32_mask_to_bits32_mask(const bool* data) {
     return bytes32_mask_to_bits32_mask(reinterpret_cast<const uint8_t*>(data));
 }
 
+inline size_t count_zero_num(const int8_t* data, size_t size) {
+    size_t num = 0;
+    const int8_t* end = data + size;
+
+#if defined(__SSE2__) && defined(__POPCNT__)

Review Comment:
   rethink we need to do simd by ourself ? the code can auto simd by compiler and have `AVX2` 
   ```
       for (; data < end; ++data) {
           num += (*data == 0);
       }
   ```



##########
be/src/vec/exprs/vruntimefilter_wrapper.h:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+    VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+    VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+    ~VRuntimeFilterWrapper() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                                  int* result_column_id) override;
+    virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                                  VExprContext* context) override;
+    virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                               FunctionContext::FunctionStateScope scope) override;
+    virtual std::string debug_string() const override { return _impl->debug_string(); };
+    virtual bool is_constant() const override;
+    virtual void close(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    virtual VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VRuntimeFilterWrapper(*this));
+    }
+    virtual const std::string& expr_name() const override;
+
+    virtual DataTypePtr& data_type() override { return _impl->data_type(); }
+
+    virtual TypeDescriptor type() override { return _impl->type(); }
+
+    virtual bool is_slot_ref() const override { return _impl->is_slot_ref(); }
+
+    virtual TExprNodeType::type node_type() const override { return _impl->node_type(); }
+
+    virtual void add_child(VExpr* expr) override { _impl->add_child(expr); }
+
+    virtual bool is_nullable() const override { return _impl->is_nullable(); }
+
+    virtual PrimitiveType result_type() const override { return _impl->result_type(); }
+
+    virtual const std::vector<VExpr*>& children() const override { return _impl->children(); }
+    virtual void set_children(std::vector<VExpr*> children) override {
+        _impl->set_children(children);
+    }
+
+    virtual bool is_and_expr() const override { return _impl->is_and_expr(); }
+
+    virtual const TFunction& fn() const override { return _impl->fn(); }
+    virtual ColumnPtrWrapper* get_const_col(VExprContext* context) override {
+        return _impl->get_const_col(context);
+    }
+
+protected:

Review Comment:
   `final` change to `private` 



##########
be/src/vec/exprs/vruntimefilter_wrapper.h:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+    VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+    VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+    ~VRuntimeFilterWrapper() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,

Review Comment:
   `final` not need `virtual` 



##########
be/src/vec/exprs/vruntimefilter_wrapper.h:
##########
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+    VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+    VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+    ~VRuntimeFilterWrapper() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                                  int* result_column_id) override;
+    virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                                  VExprContext* context) override;
+    virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                               FunctionContext::FunctionStateScope scope) override;
+    virtual std::string debug_string() const override { return _impl->debug_string(); };
+    virtual bool is_constant() const override;
+    virtual void close(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    virtual VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VRuntimeFilterWrapper(*this));
+    }
+    virtual const std::string& expr_name() const override;
+
+    virtual DataTypePtr& data_type() override { return _impl->data_type(); }
+
+    virtual TypeDescriptor type() override { return _impl->type(); }
+
+    virtual bool is_slot_ref() const override { return _impl->is_slot_ref(); }
+
+    virtual TExprNodeType::type node_type() const override { return _impl->node_type(); }
+
+    virtual void add_child(VExpr* expr) override { _impl->add_child(expr); }
+
+    virtual bool is_nullable() const override { return _impl->is_nullable(); }
+
+    virtual PrimitiveType result_type() const override { return _impl->result_type(); }
+
+    virtual const std::vector<VExpr*>& children() const override { return _impl->children(); }
+    virtual void set_children(std::vector<VExpr*> children) override {
+        _impl->set_children(children);
+    }
+
+    virtual bool is_and_expr() const override { return _impl->is_and_expr(); }
+
+    virtual const TFunction& fn() const override { return _impl->fn(); }
+    virtual ColumnPtrWrapper* get_const_col(VExprContext* context) override {
+        return _impl->get_const_col(context);
+    }
+
+protected:
+    /// Helper function that calls ctx->register(), sets fn_context_index_, and returns the
+    /// registered FunctionContext
+    virtual void register_function_context(doris::RuntimeState* state,
+                                           VExprContext* context) override {
+        _impl->register_function_context(state, context);
+    }
+
+    /// Helper function to initialize function context, called in `open` phase of VExpr:
+    /// 1. Set constant columns result of function arguments.
+    /// 2. Call function's prepare() to initialize function state, fragment-local or
+    /// thread-local according the input `FunctionStateScope` argument.
+    virtual Status init_function_context(VExprContext* context,
+                                         FunctionContext::FunctionStateScope scope,
+                                         const FunctionBasePtr& function) const override {
+        return _impl->init_function_context(context, scope, function);
+    }
+
+    /// Helper function to close function context, fragment-local or thread-local according
+    /// the input `FunctionStateScope` argument. Called in `close` phase of VExpr.
+    virtual void close_function_context(VExprContext* context,
+                                        FunctionContext::FunctionStateScope scope,
+                                        const FunctionBasePtr& function) const override {
+        _impl->close_function_context(context, scope, function);
+    }
+
+private:
+    VExpr* _impl;
+
+    bool _always_true;
+    /// TODO: statistic filter rate in the profile
+    std::atomic<int64_t> _filtered_rows;
+    std::atomic<int64_t> _scan_rows;
+
+    bool _has_calculate_filter = false;
+    // loop size must be power of 2
+    constexpr static int64_t _loop_size = 8192;

Review Comment:
   ConstExpr `LOOP_SIZE`



##########
be/src/exec/olap_scan_node.h:
##########
@@ -280,6 +280,8 @@ class OlapScanNode : public ScanNode {
     };
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+    std::vector<bool> _runtime_filter_ready_flag;
+    std::vector<std::unique_ptr<std::mutex>> _rf_locks;

Review Comment:
   why not `std::vector<std::mutex>` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#issuecomment-1154957071

   Ready for reviewing now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896937161


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {

Review Comment:
   Is it possible that `_data_type` is nullable? I think it may always be uint8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896942232


##########
be/src/vec/exprs/vexpr.cpp:
##########
@@ -88,8 +98,8 @@ void VExpr::close(doris::RuntimeState* state, VExprContext* context,
     }
 }
 
-Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr_node,
-                          VExpr** expr) {
+Status VExpr::create_expr(doris::ObjectPool* pool, const doris::TExprNode& texpr_node, VExpr** expr,

Review Comment:
   not sure when `is_runtime_filter` will be true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896402746


##########
be/src/vec/exprs/vbloom_predicate.cpp:
##########
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vbloom_predicate.h"
+
+#include <string_view>
+
+namespace doris::vectorized {
+
+VBloomPredicate::VBloomPredicate(const TExprNode& node)
+        : VExpr(node), _filter(nullptr), _expr_name("bloom_predicate") {}
+
+Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                VExprContext* context) {
+    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+
+    if (_is_prepare) {
+        return Status::OK();
+    }
+    if (_children.size() != 1) {
+        return Status::InternalError("Invalid argument for VBloomPredicate.");
+    }
+
+    _is_prepare = true;
+
+    ColumnsWithTypeAndName argument_template;
+    argument_template.reserve(_children.size());
+    for (auto child : _children) {
+        auto column = child->data_type()->create_column();
+        argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name());
+    }
+    return Status::OK();
+}
+
+Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
+                             FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    return Status::OK();
+}
+
+void VBloomPredicate::close(RuntimeState* state, VExprContext* context,
+                            FunctionContext::FunctionStateScope scope) {
+    VExpr::close(state, context, scope);
+}
+
+Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) {
+    doris::vectorized::ColumnNumbers arguments(_children.size());
+    for (int i = 0; i < _children.size(); ++i) {
+        int column_id = -1;
+        _children[i]->execute(context, block, &column_id);
+        arguments[i] = column_id;
+    }
+    // call function
+    size_t num_columns_without_result = block->columns();
+    auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+
+    ColumnPtr argument_column =
+            block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+    size_t sz = argument_column->size();
+    res_data_column->resize(sz);
+    res_data_column->reserve(sz);
+    auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+    for (size_t i = 0; i < sz; i++) {
+        if (_filter->find(reinterpret_cast<const void*>(argument_column->get_data_at(i).data))) {
+            ptr[i] = 1;

Review Comment:
   maybe we can just `ptr[i] = _filter->find(reinterpret_cast<const void*>(argument_column->get_data_at(i).data))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896679320


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -193,22 +196,8 @@ PFilterType get_type(RuntimeFilterType type) {
     }
 }
 
-TTypeDesc create_type_desc(PrimitiveType type) {
-    TTypeDesc type_desc;
-    std::vector<TTypeNode> node_type;
-    node_type.emplace_back();
-    TScalarType scalarType;
-    scalarType.__set_type(to_thrift(type));
-    scalarType.__set_len(-1);
-    scalarType.__set_precision(-1);
-    scalarType.__set_scale(-1);
-    node_type.back().__set_scalar_type(scalarType);
-    type_desc.__set_types(node_type);
-    return type_desc;
-}
-
-// only used to push down to olap engine
-Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
+template <bool IS_VECTORIZED = false>

Review Comment:
   better to just use lowercase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896928158


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =
+                ((ColumnVector<UInt8>*)block->get_columns_with_type_and_name()[*result_column_id]
+                         .column.get())
+                        ->get_data()
+                        .data();
+        _filtered_rows +=
+                doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), block->rows());
+
+        if ((!_has_calculate_filter) && (_scan_rows > 0) && (_scan_rows.load() >= _loop_size)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < _expect_filter_rate) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;

Review Comment:
   why we need `_has_calculate_filter`? seems `_scan_rows > 0` is enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#issuecomment-1153872445

   DO NOT MERGE now. I will continue to test this PR. Feel free to review this PR as I think this is almost done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897490873


##########
be/src/exec/olap_scan_node.h:
##########
@@ -280,6 +280,8 @@ class OlapScanNode : public ScanNode {
     };
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+    std::vector<bool> _runtime_filter_ready_flag;
+    std::vector<std::unique_ptr<std::mutex>> _rf_locks;

Review Comment:
   why not `std::vector<std::mutex>` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896586249


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -294,6 +283,95 @@ Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
     return pool->add(new Literal(node));
 }
 
+Status create_literal(ObjectPool* pool, PrimitiveType type, const void* data,
+                      doris::vectorized::VExpr** vexpr) {
+    TExprNode node;
+
+    switch (type) {
+    case TYPE_BOOLEAN: {
+        TBoolLiteral boolLiteral;
+        boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
+        node.__set_bool_literal(boolLiteral);
+        break;
+    }
+    case TYPE_TINYINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_SMALLINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_INT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_BIGINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        TLargeIntLiteral largeIntLiteral;
+        largeIntLiteral.__set_value(
+                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
+        node.__set_large_int_literal(largeIntLiteral);
+        break;
+    }
+    case TYPE_FLOAT: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DOUBLE: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        TDateLiteral dateLiteral;
+        char convert_buffer[30];
+        reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
+        dateLiteral.__set_value(convert_buffer);
+        node.__set_date_literal(dateLiteral);
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        TDecimalLiteral decimalLiteral;
+        decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
+        node.__set_decimal_literal(decimalLiteral);
+        break;
+    }
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+        const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
+        TStringLiteral tstringLiteral;
+        tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
+        node.__set_string_literal(tstringLiteral);
+        break;
+    }
+    default:
+        DCHECK(false);
+        return Status::InvalidArgument("Invalid type!");
+    }
+    node.__set_node_type(get_expr_node_type(type));
+    node.__set_type(create_type_desc(type));
+
+    *vexpr = pool->add(new doris::vectorized::VLiteral(node));

Review Comment:
   Updated. Thanks for your advise. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897484770


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {

Review Comment:
   @BiteTheDDDDt I noticed that not only runtime filter, but all Predicates now will return nullable types now. I'm not sure if we could change all of them to non-nullable type. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896680100


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);

Review Comment:
   same problems



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897425517


##########
be/src/vec/exec/volap_scan_node.cpp:
##########
@@ -162,29 +163,63 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         scanner->set_opened();
     }
 
-    std::vector<ExprContext*> contexts;
+    std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
     for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
         if (!scanner_filter_apply_marks[i] && !_runtime_filter_ctxs[i].apply_mark) {
+            /// When runtime filters are ready during running, we should use them to filter data
+            /// in VOlapScanner.
+            /// New arrival rf will be processed as below:
+            /// 1. convert these runtime filters to vectorized expressions
+            /// 2. if this is the first scanner thread to receive this rf, construct a new
+            /// VExprContext and update `_vconjunct_ctx_ptr` in scan node. Notice that we use
+            /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr` will be updated only
+            /// once after any runtime_filters are ready.
+            /// 3. finally, just copy this new VExprContext to scanner and use it to filter data.
             IRuntimeFilter* runtime_filter = nullptr;
             state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
                                                             &runtime_filter);
             DCHECK(runtime_filter != nullptr);
             bool ready = runtime_filter->is_ready();
             if (ready) {
-                runtime_filter->get_prepared_context(&contexts, row_desc(), _expr_mem_tracker);
+                runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker);
                 scanner_filter_apply_marks[i] = true;
+                {
+                    std::unique_lock<std::mutex> l(*(_rf_locks[i]));
+                    if (!_runtime_filter_ready_flag[i]) {
+                        // Use all conjuncts and new arrival runtime filters to construct a new
+                        // expression tree here.
+                        auto last_expr =
+                                _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
+                        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+                            TExprNode texpr_node;
+                            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+                            texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+                            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+                            VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+                            new_node->add_child(last_expr);
+                            new_node->add_child(vexprs[j]);
+                            last_expr = new_node;
+                        }
+                        _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+                        auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+                        WARN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc(),

Review Comment:
   If prepare or open failed, the _vcontjunct_ctx_ptr is reset, the result is wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei merged pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei merged PR #10103:
URL: https://github.com/apache/doris/pull/10103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896396705


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -294,6 +283,95 @@ Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data) {
     return pool->add(new Literal(node));
 }
 
+Status create_literal(ObjectPool* pool, PrimitiveType type, const void* data,
+                      doris::vectorized::VExpr** vexpr) {
+    TExprNode node;
+
+    switch (type) {
+    case TYPE_BOOLEAN: {
+        TBoolLiteral boolLiteral;
+        boolLiteral.__set_value(*reinterpret_cast<const bool*>(data));
+        node.__set_bool_literal(boolLiteral);
+        break;
+    }
+    case TYPE_TINYINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int8_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_SMALLINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int16_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_INT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int32_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_BIGINT: {
+        TIntLiteral intLiteral;
+        intLiteral.__set_value(*reinterpret_cast<const int64_t*>(data));
+        node.__set_int_literal(intLiteral);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        TLargeIntLiteral largeIntLiteral;
+        largeIntLiteral.__set_value(
+                LargeIntValue::to_string(*reinterpret_cast<const int128_t*>(data)));
+        node.__set_large_int_literal(largeIntLiteral);
+        break;
+    }
+    case TYPE_FLOAT: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const float*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DOUBLE: {
+        TFloatLiteral floatLiteral;
+        floatLiteral.__set_value(*reinterpret_cast<const double*>(data));
+        node.__set_float_literal(floatLiteral);
+        break;
+    }
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        TDateLiteral dateLiteral;
+        char convert_buffer[30];
+        reinterpret_cast<const DateTimeValue*>(data)->to_string(convert_buffer);
+        dateLiteral.__set_value(convert_buffer);
+        node.__set_date_literal(dateLiteral);
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        TDecimalLiteral decimalLiteral;
+        decimalLiteral.__set_value(reinterpret_cast<const DecimalV2Value*>(data)->to_string());
+        node.__set_decimal_literal(decimalLiteral);
+        break;
+    }
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_STRING: {
+        const StringValue* string_value = reinterpret_cast<const StringValue*>(data);
+        TStringLiteral tstringLiteral;
+        tstringLiteral.__set_value(std::string(string_value->ptr, string_value->len));
+        node.__set_string_literal(tstringLiteral);
+        break;
+    }
+    default:
+        DCHECK(false);
+        return Status::InvalidArgument("Invalid type!");
+    }
+    node.__set_node_type(get_expr_node_type(type));
+    node.__set_type(create_type_desc(type));
+
+    *vexpr = pool->add(new doris::vectorized::VLiteral(node));

Review Comment:
   Maybe we can reuse `Expr* create_literal(ObjectPool* pool, PrimitiveType type, const void* data)` and make it to template function just like `new T(node)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r896933670


##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =
+                ((ColumnVector<UInt8>*)block->get_columns_with_type_and_name()[*result_column_id]
+                         .column.get())
+                        ->get_data()
+                        .data();
+        _filtered_rows +=
+                doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), block->rows());
+
+        if ((!_has_calculate_filter) && (_scan_rows > 0) && (_scan_rows.load() >= _loop_size)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < _expect_filter_rate) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;

Review Comment:
   `(_scan_rows > 0) && (_scan_rows.load() >= _loop_size`
   Is x somewhat redundant?



##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {
+            auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
+            block->insert({ColumnNullable::create(std::move(res_data_column), std::move(null_map)),
+                           _data_type, expr_name()});
+        } else {
+            block->insert({std::move(res_data_column), _data_type, expr_name()});
+        }
+        *result_column_id = num_columns_without_result;
+        return Status::OK();
+    } else {
+        _scan_rows += block->rows();
+        auto status = _impl->execute(context, block, result_column_id);
+        auto* data =
+                ((ColumnVector<UInt8>*)block->get_columns_with_type_and_name()[*result_column_id]
+                         .column.get())
+                        ->get_data()
+                        .data();
+        _filtered_rows +=
+                doris::simd::count_zero_num(reinterpret_cast<const int8_t*>(data), block->rows());
+
+        if ((!_has_calculate_filter) && (_scan_rows > 0) && (_scan_rows.load() >= _loop_size)) {
+            double rate = (double)_filtered_rows / _scan_rows;
+            if (rate < _expect_filter_rate) {
+                _always_true = true;
+            }
+            _has_calculate_filter = true;

Review Comment:
   `(_scan_rows > 0) && (_scan_rows.load() >= _loop_size`
   Is check somewhat redundant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897427437


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -986,6 +980,21 @@ Status IRuntimeFilter::get_prepared_context(std::vector<ExprContext*>* push_expr
     return Status::OK();
 }
 
+Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>* vexprs,
+                                           const RowDescriptor& desc,
+                                           const std::shared_ptr<MemTracker>& tracker) {
+    DCHECK(_is_ready);
+    DCHECK(is_consumer());
+    std::lock_guard<std::mutex> guard(_inner_mutex);
+
+    if (_push_down_vexprs.empty()) {

Review Comment:
   _push_down_vexprs should be a local variable, not a class member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/doris/pull/10103#discussion_r899969452


##########
be/src/vec/exprs/vbloom_predicate.h:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bloomfilter_predicate.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VBloomPredicate final : public VExpr {
+public:
+    VBloomPredicate(const TExprNode& node);
+    ~VBloomPredicate() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                                  int* result_column_id) override;
+    virtual doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                                  VExprContext* context) override;
+    virtual doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                               FunctionContext::FunctionStateScope scope) override;
+    virtual void close(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    virtual VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VBloomPredicate(*this));
+    }
+    virtual const std::string& expr_name() const override;
+    void set_filter(IBloomFilterFuncBase* filter);

Review Comment:
   this interface is danger will cause double delete, if no need delete the interface



##########
be/src/vec/exprs/vbloom_predicate.h:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exprs/bloomfilter_predicate.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VBloomPredicate final : public VExpr {
+public:
+    VBloomPredicate(const TExprNode& node);
+    ~VBloomPredicate() = default;
+    virtual doris::Status execute(VExprContext* context, doris::vectorized::Block* block,

Review Comment:
   `final` not need `virtual` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #10103: [feature] support runtime filter on vectorized engine

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r899742933


##########
be/src/vec/exprs/vruntimefilter_wrapper.h:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+class VRuntimeFilterWrapper final : public VExpr {
+public:
+    VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl);
+    VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr);
+    ~VRuntimeFilterWrapper() = default;
+    doris::Status execute(VExprContext* context, doris::vectorized::Block* block,
+                          int* result_column_id) override;
+    doris::Status prepare(doris::RuntimeState* state, const doris::RowDescriptor& desc,
+                          VExprContext* context) override;
+    doris::Status open(doris::RuntimeState* state, VExprContext* context,
+                       FunctionContext::FunctionStateScope scope) override;
+    std::string debug_string() const override { return _impl->debug_string(); };
+    bool is_constant() const override;
+    void close(doris::RuntimeState* state, VExprContext* context,
+               FunctionContext::FunctionStateScope scope) override;
+    VExpr* clone(doris::ObjectPool* pool) const override {
+        return pool->add(new VRuntimeFilterWrapper(*this));
+    }
+    const std::string& expr_name() const override;
+
+    ColumnPtrWrapper* get_const_col(VExprContext* context) override {
+        return _impl->get_const_col(context);
+    }
+
+private:
+    VExpr* _impl;
+
+    bool _always_true;
+    /// TODO: statistic filter rate in the profile
+    std::atomic<int64_t> _filtered_rows;
+    std::atomic<int64_t> _scan_rows;
+
+    bool _has_calculate_filter = false;
+    // loop size must be power of 2
+    constexpr static int64_t _threshold_to_calculate_rate = 8192;

Review Comment:
   contexpr use `UPPER CASE` no need `_` in value



##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "exprs/create_predicate_function.h"
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    return _impl->prepare(state, desc, context);
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());
+        size_t num_columns_without_result = block->columns();
+        size_t sz = block->rows();
+        res_data_column->resize(sz);
+        res_data_column->reserve(sz);
+        auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = 1;
+        }
+        if (_data_type->is_nullable()) {

Review Comment:
   yes, no need change the `uint8`



##########
be/src/vec/exprs/vruntimefilter_wrapper.cpp:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vruntimefilter_wrapper.h"
+
+#include <string_view>
+
+#include "util/simd/bits.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_set.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl)
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {}
+
+VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr)
+        : VExpr(vexpr),
+          _impl(vexpr._impl),
+          _always_true(vexpr._always_true),
+          _filtered_rows(vexpr._filtered_rows.load()),
+          _scan_rows(vexpr._scan_rows.load()) {}
+
+Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
+                                      VExprContext* context) {
+    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    return Status::OK();
+}
+
+Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    return _impl->open(state, context, scope);
+}
+
+void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
+                                  FunctionContext::FunctionStateScope scope) {
+    _impl->close(state, context, scope);
+}
+
+bool VRuntimeFilterWrapper::is_constant() const {
+    return _impl->is_constant();
+}
+
+Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
+    if (_always_true) {
+        auto res_data_column = ColumnVector<UInt8>::create(block->rows());

Review Comment:
   `auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);`



##########
be/src/util/simd/bits.h:
##########
@@ -57,5 +57,35 @@ inline uint32_t bytes32_mask_to_bits32_mask(const bool* data) {
     return bytes32_mask_to_bits32_mask(reinterpret_cast<const uint8_t*>(data));
 }
 
+inline size_t count_zero_num(const int8_t* data, size_t size) {
+    size_t num = 0;
+    const int8_t* end = data + size;
+
+#if defined(__SSE2__) && defined(__POPCNT__)

Review Comment:
   delete the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org