You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/29 17:12:40 UTC

[incubator-doris] branch dev-1.0.1 updated (aa86a7b68e -> 9b2bd8b87b)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from aa86a7b68e [cherry-pick] Some missing file for #9037
     new 1cda120c8c [Bug] fix memory leak in VDataStreamRecvr::SenderQueue (#8643)
     new b6eb5c83aa [fix](storage)bloom filter support ColumnDict (#9167)
     new a19a4822de [Enhancement][Vectorized] Improve hash table build efficiency (#9250)
     new 3659b8bb90 [fix](vectorized) Query get wrong result when ColumnDict concurrent predicate eval (#9270)
     new 9b2bd8b87b [fix](materialized-view) fix bug that can not create mv for list partitioned table (#9281)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/CMakeLists.txt                                  |  8 ++
 be/src/exprs/bloomfilter_predicate.h               | 28 +++++--
 be/src/olap/bloom_filter_predicate.h               | 28 ++++++-
 be/src/olap/column_predicate.h                     |  4 +-
 be/src/olap/comparison_predicate.cpp               | 96 ++++------------------
 be/src/olap/comparison_predicate.h                 |  3 -
 be/src/olap/in_list_predicate.cpp                  | 37 +--------
 be/src/olap/in_list_predicate.h                    |  5 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  3 +-
 be/src/runtime/data_stream_recvr.cc                |  2 +-
 be/src/vec/columns/column_dictionary.h             | 42 +++++++++-
 be/src/vec/common/allocator.h                      |  7 ++
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  5 +-
 .../java/org/apache/doris/catalog/OlapTable.java   | 20 +++--
 .../org/apache/doris/alter/AlterJobV2Test.java     | 41 ++++++++-
 .../doris/alter/MaterializedViewHandlerTest.java   |  5 +-
 16 files changed, 177 insertions(+), 157 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/05: [fix](storage)bloom filter support ColumnDict (#9167)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b6eb5c83aa399a2eeadad5c95a3400817e2daa08
Author: wangbo <wa...@apache.org>
AuthorDate: Thu Apr 28 20:03:26 2022 +0800

    [fix](storage)bloom filter support ColumnDict (#9167)
    
    bloom filter support ColumnDict(#9167)
---
 be/src/exprs/bloomfilter_predicate.h   | 28 ++++++++++++++++-------
 be/src/olap/bloom_filter_predicate.h   | 28 +++++++++++++++++++----
 be/src/vec/columns/column_dictionary.h | 42 ++++++++++++++++++++++++++++++++--
 3 files changed, 84 insertions(+), 14 deletions(-)

diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h
index 2af9e32211..1e5fa5bebb 100644
--- a/be/src/exprs/bloomfilter_predicate.h
+++ b/be/src/exprs/bloomfilter_predicate.h
@@ -61,8 +61,9 @@ public:
 
     size_t size() { return _bloom_filter->directory().size; }
 
-    bool test_bytes(const char* data, size_t len) const {
-        return _bloom_filter->find(Slice(data, len));
+    template <typename T>
+    bool test(T data) const {
+        return _bloom_filter->find(data);
     }
 
     void add_bytes(const char* data, size_t len) { _bloom_filter->insert(Slice(data, len)); }
@@ -83,6 +84,7 @@ public:
     virtual void insert(const void* data) = 0;
     virtual bool find(const void* data) const = 0;
     virtual bool find_olap_engine(const void* data) const = 0;
+    virtual bool find_uint32_t(uint32_t data) const = 0;
 
     virtual Status merge(IBloomFilterFuncBase* bloomfilter_func) = 0;
     virtual Status assign(const char* data, int len) = 0;
@@ -172,12 +174,15 @@ struct CommonFindOp {
         bloom_filter.add_bytes((char*)data, sizeof(T));
     }
     ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
-        return bloom_filter.test_bytes((char*)data, sizeof(T));
+        return bloom_filter.test(Slice((char*)data, sizeof(T)));
     }
     ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
                                         const void* data) const {
         return this->find(bloom_filter, data);
     }
+    ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+        return bloom_filter.test(data);
+    }
 };
 
 template <class BloomFilterAdaptor>
@@ -193,12 +198,15 @@ struct StringFindOp {
         if (value == nullptr) {
             return false;
         }
-        return bloom_filter.test_bytes(value->ptr, value->len);
+        return bloom_filter.test(Slice(value->ptr, value->len));
     }
     ALWAYS_INLINE bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
                                         const void* data) const {
         return StringFindOp::find(bloom_filter, data);
     }
+    ALWAYS_INLINE bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+        return bloom_filter.test(data);
+    }
 };
 
 // We do not need to judge whether data is empty, because null will not appear
@@ -211,7 +219,7 @@ struct FixedStringFindOp : public StringFindOp<BloomFilterAdaptor> {
         int64_t size = value->len;
         char* data = value->ptr;
         while (size > 0 && data[size - 1] == '\0') size--;
-        return bloom_filter.test_bytes(value->ptr, size);
+        return bloom_filter.test(Slice(value->ptr, size));
     }
 };
 
@@ -220,7 +228,7 @@ struct DateTimeFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
     bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* data) const {
         DateTimeValue value;
         value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
-        return bloom_filter.test_bytes((char*)&value, sizeof(DateTimeValue));
+        return bloom_filter.test(Slice((char*)&value, sizeof(DateTimeValue)));
     }
 };
 
@@ -239,7 +247,7 @@ struct DateFindOp : public CommonFindOp<DateTimeValue, BloomFilterAdaptor> {
 
         char data_bytes[sizeof(date_value)];
         memcpy(&data_bytes, &date_value, sizeof(date_value));
-        return bloom_filter.test_bytes(data_bytes, sizeof(DateTimeValue));
+        return bloom_filter.test(Slice(data_bytes, sizeof(DateTimeValue)));
     }
 };
 
@@ -255,7 +263,7 @@ struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value, BloomFilterAdaptor>
         constexpr int decimal_value_sz = sizeof(DecimalV2Value);
         char data_bytes[decimal_value_sz];
         memcpy(&data_bytes, &value, decimal_value_sz);
-        return bloom_filter.test_bytes(data_bytes, decimal_value_sz);
+        return bloom_filter.test(Slice(data_bytes, decimal_value_sz));
     }
 };
 
@@ -315,6 +323,10 @@ public:
     bool find_olap_engine(const void* data) const override {
         return dummy.find_olap_engine(*this->_bloom_filter, data);
     }
+    
+    bool find_uint32_t(uint32_t data) const override {
+        return dummy.find(*this->_bloom_filter, data);
+    }
 
 private:
     typename BloomFilterTypeTraits<type, BloomFilterAdaptor>::FindOp dummy;
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index fa65293d30..01851cfd96 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -31,6 +31,7 @@
 #include "vec/columns/column_vector.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/utils/util.hpp"
+#include "vec/columns/column_dictionary.h"
 
 namespace doris {
 
@@ -116,14 +117,33 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
     if (column.is_nullable()) {
         auto* nullable_col = vectorized::check_and_get_column<vectorized::ColumnNullable>(column);
         auto& null_map_data = nullable_col->get_null_map_column().get_data();
-        auto* pred_col = vectorized::check_and_get_column<vectorized::PredicateColumnType<FT>>(
+        // deal ColumnDict
+        if (nullable_col->get_nested_column().is_column_dictionary()) {
+            auto* dict_col = vectorized::check_and_get_column<vectorized::ColumnDictI32>(nullable_col->get_nested_column());
+            const_cast<vectorized::ColumnDictI32*>(dict_col)->generate_hash_values();
+            for (uint16_t i = 0; i < *size; i++) {
+                uint16_t idx = sel[i];
+                sel[new_size] = idx;
+                new_size += (!null_map_data[idx]) && _specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
+            }
+        } else {
+            auto* pred_col = vectorized::check_and_get_column<vectorized::PredicateColumnType<FT>>(
                 nullable_col->get_nested_column());
-        auto& pred_col_data = pred_col->get_data();
+            auto& pred_col_data = pred_col->get_data();
+            for (uint16_t i = 0; i < *size; i++) {
+                uint16_t idx = sel[i];
+                sel[new_size] = idx;
+                const auto* cell_value = reinterpret_cast<const void*>(&(pred_col_data[idx]));
+                new_size += (!null_map_data[idx]) && _specific_filter->find_olap_engine(cell_value);
+            }
+        }
+    } else if (column.is_column_dictionary()) {
+        auto* dict_col = vectorized::check_and_get_column<vectorized::ColumnDictI32>(column);
+        const_cast<vectorized::ColumnDictI32*>(dict_col)->generate_hash_values();
         for (uint16_t i = 0; i < *size; i++) {
             uint16_t idx = sel[i];
             sel[new_size] = idx;
-            const auto* cell_value = reinterpret_cast<const void*>(&(pred_col_data[idx]));
-            new_size += (!null_map_data[idx]) && _specific_filter->find_olap_engine(cell_value);
+            new_size += _specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
         }
     } else {
         auto* pred_col =
diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h
index 76f9516c9c..7d7117aee9 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -67,6 +67,7 @@ public:
     using value_type = T;
     using Container = PaddedPODArray<value_type>;
     using DictContainer = PaddedPODArray<StringValue>;
+    using HashValueContainer = PaddedPODArray<uint32_t>; // used for bloom filter
 
     bool is_column_dictionary() const override { return true; }
 
@@ -106,6 +107,7 @@ public:
     void clear() override {
         _codes.clear();
         _dict_code_converted = false;
+        _dict.clear_hash_values();
     }
 
     // TODO: Make dict memory usage more precise
@@ -251,6 +253,14 @@ public:
         return _dict.find_code_by_bound(value, greater, eq);
     }
 
+    void generate_hash_values() {
+        _dict.generate_hash_values();
+    }
+
+    uint32_t get_hash_value(uint32_t idx) const {
+        return _dict.get_hash_value(_codes[idx]);
+    }
+
     phmap::flat_hash_set<int32_t> find_codes(
             const phmap::flat_hash_set<StringValue>& values) const {
         return _dict.find_codes(values);
@@ -297,6 +307,23 @@ public:
             return -1;
         }
 
+        inline StringValue& get_value(T code) { return _dict_data[code]; }
+        
+        inline void generate_hash_values() {
+            if (_hash_values.size() == 0) {
+                _hash_values.resize(_dict_data.size());
+                for (size_t i = 0; i < _dict_data.size(); i++) {
+                    auto& sv = _dict_data[i];
+                    uint32_t hash_val = HashUtil::murmur_hash3_32(sv.ptr, sv.len, 0);
+                    _hash_values[i] = hash_val;
+                }
+            }
+        }
+
+        inline uint32_t get_hash_value(T code) const {
+            return _hash_values[code];
+        }
+
         // For > , code takes upper_bound - 1; For >= , code takes upper_bound
         // For < , code takes upper_bound; For <=, code takes upper_bound - 1
         // For example a sorted dict: <'b',0> <'c',1> <'d',2>
@@ -336,12 +363,15 @@ public:
             return code_set;
         }
 
-        inline StringValue& get_value(T code) { return _dict_data[code]; }
-
         void clear() {
             _dict_data.clear();
             _inverted_index.clear();
             _code_convert_map.clear();
+            _hash_values.clear();
+        }
+
+        void clear_hash_values() {
+            _hash_values.clear();
         }
 
         void sort() {
@@ -365,6 +395,12 @@ public:
         phmap::flat_hash_map<StringValue, T, StringValue::HashOfStringValue> _inverted_index;
         // data page code -> sorted dict code, only used for range comparison predicate
         phmap::flat_hash_map<T, T> _code_convert_map;
+        // hash value of origin string , used for bloom filter
+        // It's a trade-off of space for performance
+        // But in TPC-DS 1GB q60,we see no significant improvement. 
+        // This may because the magnitude of the data is not large enough(in q60, only about 80k rows data is filtered for largest table)
+        // So we may need more test here.
+        HashValueContainer _hash_values;
     };
 
 private:
@@ -381,4 +417,6 @@ template class ColumnDictionary<uint16_t>;
 template class ColumnDictionary<uint32_t>;
 template class ColumnDictionary<int32_t>;
 
+using ColumnDictI32 = vectorized::ColumnDictionary<doris::vectorized::Int32>;
+
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/05: [Enhancement][Vectorized] Improve hash table build efficiency (#9250)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a19a4822de66ff688f8e4aded0046eb61eab7c89
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:26:33 2022 +0800

    [Enhancement][Vectorized] Improve hash table build efficiency (#9250)
    
    1. MAP_POPULATE is missing for mmap in Allocator, because macro OS_LINUX is not defined in allocator.h;
    2. MAP_POPULATE has no effect for mremap as for mmap, zero-fill enlarged memory range explicitly to pre-fault the pages
---
 be/CMakeLists.txt             | 8 ++++++++
 be/src/vec/common/allocator.h | 7 +++++++
 2 files changed, 15 insertions(+)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 14183cf0d7..687a9629ca 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -43,6 +43,14 @@ if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(ppc64le.*|PPC64LE.*)")
     set (ARCH_PPC64LE 1)
 endif ()
 
+if (CMAKE_SYSTEM_NAME MATCHES "Linux")
+    set (OS_LINUX 1)
+    add_definitions(-D OS_LINUX)
+elseif (CMAKE_SYSTEM_NAME MATCHES "Darwin")
+    set (OS_MACOSX 1)
+    add_definitions(-D OS_MACOSX)
+endif ()
+
 if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
     set (COMPILER_GCC 1)
 elseif (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2a50dabf5c..d1f748d5b9 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -148,6 +148,13 @@ public:
                                                   doris::TStatusCode::VEC_CANNOT_MREMAP);
 
             /// No need for zero-fill, because mmap guarantees it.
+
+            if constexpr (mmap_populate) {
+                // MAP_POPULATE seems have no effect for mremap as for mmap,
+                // Clear enlarged memory range explicitly to pre-fault the pages
+                if (new_size > old_size)
+                    memset(reinterpret_cast<char*>(buf) + old_size, 0, new_size - old_size);
+            }
         } else if (new_size < MMAP_THRESHOLD) {
             /// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
             // CurrentMemoryTracker::realloc(old_size, new_size);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 05/05: [fix](materialized-view) fix bug that can not create mv for list partitioned table (#9281)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 9b2bd8b87b42f7baefcb2a513b3206497dc6bf4c
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 29 10:45:09 2022 +0800

    [fix](materialized-view) fix bug that can not create mv for list partitioned table (#9281)
---
 .../java/org/apache/doris/catalog/OlapTable.java   | 20 ++++++-----
 .../org/apache/doris/alter/AlterJobV2Test.java     | 41 +++++++++++++++++++++-
 .../doris/alter/MaterializedViewHandlerTest.java   |  5 ++-
 3 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index d23a4609a0..0c2c99ad97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -21,10 +21,10 @@ import org.apache.doris.alter.MaterializedViewHandler;
 import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.ColumnDef;
 import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.backup.Status;
 import org.apache.doris.backup.Status.ErrCode;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
@@ -49,12 +49,12 @@ import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TOlapTable;
+import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
-import org.apache.doris.thrift.TSortType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -635,17 +635,19 @@ public class OlapTable extends Table {
         return partitionInfo;
     }
 
-    public Set<String> getPartitionColumnNames() {
+    public Set<String> getPartitionColumnNames() throws DdlException {
         Set<String> partitionColumnNames = Sets.newHashSet();
         if (partitionInfo instanceof SinglePartitionInfo) {
             return partitionColumnNames;
+        } else if (partitionInfo instanceof RangePartitionInfo) {
+            RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
+            return rangePartitionInfo.getPartitionColumns().stream().map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
+        } else if (partitionInfo instanceof ListPartitionInfo) {
+            ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
+            return listPartitionInfo.getPartitionColumns().stream().map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
+        } else {
+            throw new DdlException("Unknown partition info type: " + partitionInfo.getType().name());
         }
-        RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
-        List<Column> partitionColumns = rangePartitionInfo.getPartitionColumns();
-        for (Column column : partitionColumns) {
-            partitionColumnNames.add(column.getName().toLowerCase());
-        }
-        return partitionColumnNames;
     }
 
     public DistributionInfo getDefaultDistributionInfo() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
index 6a25b72892..2bb1842b36 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
@@ -19,6 +19,7 @@ package org.apache.doris.alter;
 
 import org.apache.doris.analysis.AlterTableStmt;
 import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.ShowAlterStmt;
 import org.apache.doris.catalog.Catalog;
@@ -85,6 +86,11 @@ public class AlterJobV2Test {
         Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
     }
 
+    private static void createMaterializedView(String sql) throws Exception {
+        CreateMaterializedViewStmt stmt = (CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Catalog.getCurrentCatalog().getAlterInstance().processCreateMaterializedView(stmt);
+    }
+
     @Test
     public void testSchemaChange() throws Exception {
         // 1. process a schema change job
@@ -138,7 +144,7 @@ public class AlterJobV2Test {
         System.out.println(showResultSet.getMetaData());
         System.out.println(showResultSet.getResultRows());
     }
-    
+
     @Test
     @Deprecated
     public void testAlterSegmentV2() throws Exception {
@@ -217,4 +223,37 @@ public class AlterJobV2Test {
         waitAlterJobDone(alterJobs);
         ExceptionChecker.expectThrowsNoException(() -> alterTable("alter table test.dup_table modify column v2 varchar(2);"));
     }
+
+    @Test
+    public void testCreateMVForListPartitionTable() throws Exception {
+        createTable("CREATE TABLE test.list_tbl (\n" +
+                "city VARCHAR(20) NOT NULL,\n" +
+                "user_id BIGINT NOT NULL,\n" +
+                "date DATE NOT NULL,\n" +
+                "age SMALLINT NOT NULL,\n" +
+                "sex TINYINT NOT NULL,\n" +
+                "cost BIGINT NOT NULL DEFAULT \"0\"\n" +
+                ") DUPLICATE KEY(city) PARTITION BY LIST(city) (\n" +
+                "PARTITION p_bj\n" +
+                "VALUES IN (\"beijing\"),\n" +
+                "PARTITION p_gz\n" +
+                "VALUES IN (\"guangzhou\"),\n" +
+                "PARTITION p_sz\n" +
+                "VALUES IN (\"shenzhen\")\n" +
+                ") DISTRIBUTED BY HASH(date) BUCKETS 1 PROPERTIES(\"replication_num\" = \"1\");");
+
+        createMaterializedView("create materialized view list_view as\n" +
+                "select city,\n" +
+                "user_id,\n" +
+                "date,\n" +
+                "sum(cost)\n" +
+                "from\n" +
+                "test.list_tbl\n" +
+                "group by\n" +
+                "city,\n" +
+                "user_id,\n" +
+                "date;");
+        Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        waitAlterJobDone(alterJobs);
+    }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/MaterializedViewHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/MaterializedViewHandlerTest.java
index e33f840ffa..b852d64785 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/alter/MaterializedViewHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/alter/MaterializedViewHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 
 import com.google.common.collect.Lists;
@@ -272,10 +273,9 @@ public class MaterializedViewHandlerTest {
         }
     }
 
-
     @Test
     public void checkInvalidPartitionKeyMV(@Injectable CreateMaterializedViewStmt createMaterializedViewStmt,
-                                           @Injectable OlapTable olapTable) {
+                                           @Injectable OlapTable olapTable) throws DdlException {
         final String mvName = "mv1";
         final String columnName1 = "k1";
         Column baseColumn1 = new Column(columnName1, Type.VARCHAR, true, null, "", "");
@@ -308,7 +308,6 @@ public class MaterializedViewHandlerTest {
         }
     }
 
-
     @Test
     public void testCheckDropMaterializedView(@Injectable OlapTable olapTable, @Injectable Partition partition,
                                               @Injectable MaterializedIndex materializedIndex) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/05: [fix](vectorized) Query get wrong result when ColumnDict concurrent predicate eval (#9270)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 3659b8bb90a97761d6d72e938113bdff86cc846e
Author: ZenoYang <co...@qq.com>
AuthorDate: Fri Apr 29 11:45:04 2022 +0800

    [fix](vectorized) Query get wrong result when ColumnDict concurrent predicate eval (#9270)
---
 be/src/olap/column_predicate.h                     |  4 +-
 be/src/olap/comparison_predicate.cpp               | 96 ++++------------------
 be/src/olap/comparison_predicate.h                 |  3 -
 be/src/olap/in_list_predicate.cpp                  | 37 +--------
 be/src/olap/in_list_predicate.h                    |  5 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  3 +-
 6 files changed, 22 insertions(+), 126 deletions(-)

diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 45f8fcfa7f..56f817d9f3 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -42,7 +42,7 @@ enum class PredicateType {
     GT = 5,
     GE = 6,
     IN_LIST = 7,
-    NO_IN_LIST = 8,
+    NOT_IN_LIST = 8,
     IS_NULL = 9,
     NOT_IS_NULL = 10,
     BF = 11, // BloomFilter
@@ -86,8 +86,6 @@ public:
     virtual void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const {};
     uint32_t column_id() const { return _column_id; }
 
-    virtual void set_dict_code_if_necessary(vectorized::IColumn& column) { }
-
 protected:
     uint32_t _column_id;
     bool _opposite;
diff --git a/be/src/olap/comparison_predicate.cpp b/be/src/olap/comparison_predicate.cpp
index 57549c349c..45a89f92ad 100644
--- a/be/src/olap/comparison_predicate.cpp
+++ b/be/src/olap/comparison_predicate.cpp
@@ -147,7 +147,7 @@ COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterPredicate, >)
 COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
 
 // todo(zeno) define interface in IColumn to simplify code
-#define COMPARISON_PRED_COLUMN_EVALUATE(CLASS, OP)                                                 \
+#define COMPARISON_PRED_COLUMN_EVALUATE(CLASS, OP, IS_RANGE)                                       \
     template <class T>                                                                             \
     void CLASS<T>::evaluate(vectorized::IColumn& column, uint16_t* sel, uint16_t* size) const {    \
         uint16_t new_size = 0;                                                                     \
@@ -163,11 +163,14 @@ COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
                     auto* nested_col_ptr = vectorized::check_and_get_column<                       \
                             vectorized::ColumnDictionary<vectorized::Int32>>(nested_col);          \
                     auto& data_array = nested_col_ptr->get_data();                                 \
+                    auto dict_code =                                                               \
+                            IS_RANGE ? nested_col_ptr->find_code_by_bound(_value, 0 OP 1, 1 OP 1)  \
+                                     : nested_col_ptr->find_code(_value);                          \
                     for (uint16_t i = 0; i < *size; i++) {                                         \
                         uint16_t idx = sel[i];                                                     \
                         sel[new_size] = idx;                                                       \
                         const auto& cell_value = data_array[idx];                                  \
-                        bool ret = !null_bitmap[idx] && (cell_value OP _dict_code);                \
+                        bool ret = !null_bitmap[idx] && (cell_value OP dict_code);                 \
                         new_size += _opposite ? !ret : ret;                                        \
                     }                                                                              \
                 }                                                                                  \
@@ -184,20 +187,20 @@ COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
                     new_size += _opposite ? !ret : ret;                                            \
                 }                                                                                  \
             }                                                                                      \
-            *size = new_size;                                                                      \
         } else if (column.is_column_dictionary()) {                                                \
             if constexpr (std::is_same_v<T, StringValue>) {                                        \
                 auto& dict_col =                                                                   \
                         reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(column);\
                 auto& data_array = dict_col.get_data();                                            \
+                auto dict_code = IS_RANGE ? dict_col.find_code_by_bound(_value, 0 OP 1, 1 OP 1)    \
+                                          : dict_col.find_code(_value);                            \
                 for (uint16_t i = 0; i < *size; ++i) {                                             \
                     uint16_t idx = sel[i];                                                         \
                     sel[new_size] = idx;                                                           \
                     const auto& cell_value = data_array[idx];                                      \
-                    bool ret = cell_value OP _dict_code;                                           \
+                    bool ret = cell_value OP dict_code;                                           \
                     new_size += _opposite ? !ret : ret;                                            \
                 }                                                                                  \
-                *size = new_size;                                                                  \
             }                                                                                      \
         } else {                                                                                   \
             auto& pred_column_ref =                                                                \
@@ -210,17 +213,17 @@ COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
                 auto ret = cell_value OP _value;                                                   \
                 new_size += _opposite ? !ret : ret;                                                \
             }                                                                                      \
-            *size = new_size;                                                                      \
         }                                                                                          \
+        *size = new_size;                                                                          \
     }
 
 
-COMPARISON_PRED_COLUMN_EVALUATE(EqualPredicate, ==)
-COMPARISON_PRED_COLUMN_EVALUATE(NotEqualPredicate, !=)
-COMPARISON_PRED_COLUMN_EVALUATE(LessPredicate, <)
-COMPARISON_PRED_COLUMN_EVALUATE(LessEqualPredicate, <=)
-COMPARISON_PRED_COLUMN_EVALUATE(GreaterPredicate, >)
-COMPARISON_PRED_COLUMN_EVALUATE(GreaterEqualPredicate, >=)
+COMPARISON_PRED_COLUMN_EVALUATE(EqualPredicate, ==, false)
+COMPARISON_PRED_COLUMN_EVALUATE(NotEqualPredicate, !=, false)
+COMPARISON_PRED_COLUMN_EVALUATE(LessPredicate, <, true)
+COMPARISON_PRED_COLUMN_EVALUATE(LessEqualPredicate, <=, true)
+COMPARISON_PRED_COLUMN_EVALUATE(GreaterPredicate, >, true)
+COMPARISON_PRED_COLUMN_EVALUATE(GreaterEqualPredicate, >=, true)
 
 #define COMPARISON_PRED_COLUMN_EVALUATE_VEC(CLASS, OP)                                         \
     template <class T>                                                                         \
@@ -502,65 +505,6 @@ COMPARISON_PRED_BITMAP_EVALUATE(LessEqualPredicate, <=)
 COMPARISON_PRED_BITMAP_EVALUATE(GreaterPredicate, >)
 COMPARISON_PRED_BITMAP_EVALUATE(GreaterEqualPredicate, >=)
 
-
-#define COMPARISON_PRED_SET_DICT_CODE(CLASS)                                                   \
-    template <class T>                                                                         \
-    void CLASS<T>::set_dict_code_if_necessary(vectorized::IColumn& column) {                   \
-        if (_dict_code_inited) {                                                               \
-            return;                                                                            \
-        }                                                                                      \
-        if constexpr (std::is_same_v<T, StringValue>) {                                        \
-            auto* col_ptr = column.get_ptr().get();                                            \
-            if (column.is_nullable()) {                                                        \
-                auto nullable_col =                                                            \
-                        reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);                \
-                col_ptr = nullable_col->get_nested_column_ptr().get();                         \
-            }                                                                                  \
-            if (col_ptr->is_column_dictionary()) {                                             \
-                auto& dict_col =                                                               \
-                        reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(    \
-                                *col_ptr);                                                     \
-                _dict_code = dict_col.find_code(_value);                                       \
-                _dict_code_inited = true;                                                      \
-            }                                                                                  \
-        }                                                                                      \
-    }
-
-COMPARISON_PRED_SET_DICT_CODE(EqualPredicate)
-COMPARISON_PRED_SET_DICT_CODE(NotEqualPredicate)
-
-// If 1 OP 0 returns true, it means the predicate is > or >=
-// If 1 OP 1 returns true, it means the predicate is >= or <=
-// by this way, avoid redundant code
-#define RAMGE_COMPARISON_PRED_SET_DICT_CODE(CLASS, OP)                                         \
-    template <class T>                                                                         \
-    void CLASS<T>::set_dict_code_if_necessary(vectorized::IColumn& column) {                   \
-        if (_dict_code_inited) {                                                               \
-            return;                                                                            \
-        }                                                                                      \
-        if constexpr (std::is_same_v<T, StringValue>) {                                        \
-            auto* col_ptr = column.get_ptr().get();                                            \
-            if (column.is_nullable()) {                                                        \
-                auto nullable_col =                                                            \
-                        reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);                \
-                col_ptr = nullable_col->get_nested_column_ptr().get();                         \
-            }                                                                                  \
-                                                                                               \
-            if (col_ptr->is_column_dictionary()) {                                             \
-                auto& dict_col =                                                               \
-                        reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(    \
-                                *col_ptr);                                                     \
-                _dict_code = dict_col.find_code_by_bound(_value, 1 OP 0, 1 OP 1);              \
-                _dict_code_inited = true;                                                      \
-            }                                                                                  \
-        }                                                                                      \
-    }
-
-RAMGE_COMPARISON_PRED_SET_DICT_CODE(LessPredicate, <)
-RAMGE_COMPARISON_PRED_SET_DICT_CODE(LessEqualPredicate, <=)
-RAMGE_COMPARISON_PRED_SET_DICT_CODE(GreaterPredicate, >)
-RAMGE_COMPARISON_PRED_SET_DICT_CODE(GreaterEqualPredicate, >=)
-
 #define COMPARISON_PRED_CONSTRUCTOR_DECLARATION(CLASS)                                         \
     template CLASS<int8_t>::CLASS(uint32_t column_id, const int8_t& value, bool opposite);     \
     template CLASS<int16_t>::CLASS(uint32_t column_id, const int16_t& value, bool opposite);   \
@@ -745,14 +689,4 @@ COMPARISON_PRED_COLUMN_EVALUATE_VEC_DECLARATION(LessEqualPredicate)
 COMPARISON_PRED_COLUMN_EVALUATE_VEC_DECLARATION(GreaterPredicate)
 COMPARISON_PRED_COLUMN_EVALUATE_VEC_DECLARATION(GreaterEqualPredicate)
 
-#define COMPARISON_PRED_SET_DICT_CODE_DECLARATION(CLASS) \
-template void CLASS<StringValue>::set_dict_code_if_necessary(vectorized::IColumn& column);
-
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(EqualPredicate)
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(NotEqualPredicate)
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(LessPredicate)
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(LessEqualPredicate)
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(GreaterPredicate)
-COMPARISON_PRED_SET_DICT_CODE_DECLARATION(GreaterEqualPredicate)
-
 } //namespace doris
diff --git a/be/src/olap/comparison_predicate.h b/be/src/olap/comparison_predicate.h
index 3df31c370d..7e7f718a76 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -47,11 +47,8 @@ class VectorizedRowBatch;
         void evaluate_or(vectorized::IColumn& column, uint16_t* sel, uint16_t size,                \
                          bool* flags) const override;                                              \
         void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* flags) const override; \
-        void set_dict_code_if_necessary(vectorized::IColumn& column) override;                     \
     private:                                                                                       \
         T _value;                                                                                  \
-        bool _dict_code_inited = false;                                                            \
-        int32_t _dict_code;                                                                        \
     };
 
 COMPARISON_PRED_CLASS_DEFINE(EqualPredicate, EQ)
diff --git a/be/src/olap/in_list_predicate.cpp b/be/src/olap/in_list_predicate.cpp
index 0401673926..9b78a8705f 100644
--- a/be/src/olap/in_list_predicate.cpp
+++ b/be/src/olap/in_list_predicate.cpp
@@ -134,12 +134,13 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(NotInListPredicate, ==)
                     auto* nested_col_ptr = vectorized::check_and_get_column<                       \
                             vectorized::ColumnDictionary<vectorized::Int32>>(nested_col);          \
                     auto& data_array = nested_col_ptr->get_data();                                 \
+                    auto dict_codes = nested_col_ptr->find_codes(_values);                         \
                     for (uint16_t i = 0; i < *size; i++) {                                         \
                         uint16_t idx = sel[i];                                                     \
                         sel[new_size] = idx;                                                       \
                         const auto& cell_value = data_array[idx];                                  \
                         bool ret = !null_bitmap[idx]                                               \
-                                   && (_dict_codes.find(cell_value) OP _dict_codes.end());         \
+                                   && (dict_codes.find(cell_value) OP dict_codes.end());           \
                         new_size += _opposite ? !ret : ret;                                        \
                     }                                                                              \
                 }                                                                                  \
@@ -155,18 +156,18 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(NotInListPredicate, ==)
                     new_size += _opposite ? !ret : ret;                                            \
                 }                                                                                  \
             }                                                                                      \
-            *size = new_size;                                                                      \
         } else if (column.is_column_dictionary()) {                                                \
             if constexpr (std::is_same_v<T, StringValue>) {                                        \
                 auto& dict_col =                                                                   \
                         reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(        \
                                 column);                                                           \
                 auto& data_array = dict_col.get_data();                                            \
+                auto dict_codes = dict_col.find_codes(_values);                                    \
                 for (uint16_t i = 0; i < *size; i++) {                                             \
                     uint16_t idx = sel[i];                                                         \
                     sel[new_size] = idx;                                                           \
                     const auto& cell_value = data_array[idx];                                      \
-                    auto result = (_dict_codes.find(cell_value) OP _dict_codes.end());             \
+                    auto result = (dict_codes.find(cell_value) OP dict_codes.end());               \
                     new_size += _opposite ? !result : result;                                      \
                 }                                                                                  \
             }                                                                                      \
@@ -282,32 +283,6 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_AND(NotInListPredicate, ==)
 IN_LIST_PRED_BITMAP_EVALUATE(InListPredicate, &=)
 IN_LIST_PRED_BITMAP_EVALUATE(NotInListPredicate, -=)
 
-#define IN_LIST_PRED_SET_DICT_CODE(CLASS)                                                      \
-    template <class T>                                                                         \
-    void CLASS<T>::set_dict_code_if_necessary(vectorized::IColumn& column) {                   \
-        if (_dict_code_inited) {                                                               \
-            return;                                                                            \
-        }                                                                                      \
-        if constexpr (std::is_same_v<T, StringValue>) {                                        \
-            auto* col_ptr = column.get_ptr().get();                                            \
-            if (column.is_nullable()) {                                                        \
-                auto nullable_col =                                                            \
-                        reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);                \
-                col_ptr = nullable_col->get_nested_column_ptr().get();                         \
-            }                                                                                  \
-            if (col_ptr->is_column_dictionary()) {                                             \
-                auto& dict_col =                                                               \
-                        reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(    \
-                                *col_ptr);                                                     \
-                _dict_codes = dict_col.find_codes(_values);                                    \
-                _dict_code_inited = true;                                                      \
-            }                                                                                  \
-        }                                                                                      \
-    }
-
-IN_LIST_PRED_SET_DICT_CODE(InListPredicate)
-IN_LIST_PRED_SET_DICT_CODE(NotInListPredicate)
-
 #define IN_LIST_PRED_CONSTRUCTOR_DECLARATION(CLASS)                                                \
     template CLASS<int8_t>::CLASS(uint32_t column_id, phmap::flat_hash_set<int8_t>&& values,       \
                                   bool opposite);                                                  \
@@ -415,8 +390,4 @@ IN_LIST_PRED_COLUMN_BLOCK_EVALUATE_DECLARATION(NotInListPredicate)
 IN_LIST_PRED_BITMAP_EVALUATE_DECLARATION(InListPredicate)
 IN_LIST_PRED_BITMAP_EVALUATE_DECLARATION(NotInListPredicate)
 
-template void InListPredicate<StringValue>::set_dict_code_if_necessary(vectorized::IColumn& column);
-template void NotInListPredicate<StringValue>::set_dict_code_if_necessary(
-        vectorized::IColumn& column);
-
 } //namespace doris
diff --git a/be/src/olap/in_list_predicate.h b/be/src/olap/in_list_predicate.h
index 089ee84a06..33682b6824 100644
--- a/be/src/olap/in_list_predicate.h
+++ b/be/src/olap/in_list_predicate.h
@@ -96,15 +96,12 @@ class VectorizedRowBatch;
         void evaluate(vectorized::IColumn& column, uint16_t* sel, uint16_t* size) const override; \
         void evaluate_and(vectorized::IColumn& column, uint16_t* sel, uint16_t size, bool* flags) const override {} \
         void evaluate_or(vectorized::IColumn& column, uint16_t* sel, uint16_t size, bool* flags) const override {} \
-        void set_dict_code_if_necessary(vectorized::IColumn& column) override;                    \
     private:                                                                                      \
         phmap::flat_hash_set<T> _values;                                                          \
-        bool _dict_code_inited = false;                                                           \
-        phmap::flat_hash_set<int32_t> _dict_codes;                                                \
     };
 
 IN_LIST_PRED_CLASS_DEFINE(InListPredicate, IN_LIST)
-IN_LIST_PRED_CLASS_DEFINE(NotInListPredicate, NO_IN_LIST)
+IN_LIST_PRED_CLASS_DEFINE(NotInListPredicate, NOT_IN_LIST)
 
 } //namespace doris
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 52cc584fdb..9484601f12 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -634,7 +634,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
             if (type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
                 type == OLAP_FIELD_TYPE_STRING || predicate->type() == PredicateType::BF ||
                 predicate->type() == PredicateType::IN_LIST ||
-                predicate->type() == PredicateType::NO_IN_LIST) {
+                predicate->type() == PredicateType::NOT_IN_LIST) {
                 short_cir_pred_col_id_set.insert(cid);
                 _short_cir_eval_predicate.push_back(predicate);
                 _is_all_column_basic_type = false;
@@ -867,7 +867,6 @@ void SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_
             predicate->type() == PredicateType::GT || predicate->type() == PredicateType::GE) {
             col_ptr->convert_dict_codes_if_necessary();
         }
-        predicate->set_dict_code_if_necessary(*short_cir_column);
         predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size_ptr);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/05: [Bug] fix memory leak in VDataStreamRecvr::SenderQueue (#8643)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1cda120c8c885acf6abda0468091fb762dc6363a
Author: morningman <mo...@163.com>
AuthorDate: Sat Apr 30 00:55:28 2022 +0800

    [Bug] fix memory leak in VDataStreamRecvr::SenderQueue (#8643)
---
 be/src/runtime/data_stream_recvr.cc       | 2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp | 5 ++---
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index 962395ad26..f38049dc6f 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -197,7 +197,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) {
 void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_number,
                                              int64_t packet_seq,
                                              ::google::protobuf::Closure** done) {
-    unique_lock<mutex> l(_lock);
+    lock_guard<mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index e3eb3d4e8f..24244312cc 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -90,7 +90,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
 void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
                                               int64_t packet_seq,
                                               ::google::protobuf::Closure** done) {
-    std::unique_lock<std::mutex> l(_lock);
+    std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
@@ -140,6 +140,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
 }
 
 void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
+    std::unique_lock<std::mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
@@ -158,8 +159,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
     }
     materialize_block_inplace(*nblock);
 
-
-    std::unique_lock<std::mutex> l(_lock);
     size_t block_size = nblock->bytes();
     _block_queue.emplace_back(block_size, nblock);
     _recvr->_mem_tracker->Consume(nblock->bytes());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org