You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/27 05:58:25 UTC

[doris] branch master updated: [improvement]Use phmap for aggregation with integer keys (#11175)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b74f36e009 [improvement]Use phmap for aggregation with integer keys (#11175)
b74f36e009 is described below

commit b74f36e009880b0afd7a87b0c7171f08708c88f2
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jul 27 13:58:20 2022 +0800

    [improvement]Use phmap for aggregation with integer keys (#11175)
---
 be/src/vec/common/columns_hashing_impl.h        |   1 +
 be/src/vec/common/hash_table/hash.h             |   6 ++
 be/src/vec/common/hash_table/hash_table_utils.h |  25 +++++
 be/src/vec/common/hash_table/ph_hash_map.h      |  56 ++++++----
 be/src/vec/exec/vaggregation_node.cpp           | 137 ++++++++++++++++++------
 be/src/vec/exec/vaggregation_node.h             | 132 +++++++++++++++++++----
 6 files changed, 285 insertions(+), 72 deletions(-)

diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h
index 8aa1b1e96d..2abfa3b8e8 100644
--- a/be/src/vec/common/columns_hashing_impl.h
+++ b/be/src/vec/common/columns_hashing_impl.h
@@ -25,6 +25,7 @@
 #include "vec/common/aggregation_common.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/hash_table/hash_table_key_holder.h"
+#include "vec/common/hash_table/ph_hash_map.h"
 // #include <Interpreters/AggregationCommon.h>
 
 namespace doris::vectorized {
diff --git a/be/src/vec/common/hash_table/hash.h b/be/src/vec/common/hash_table/hash.h
index fb47657809..ab99563625 100644
--- a/be/src/vec/common/hash_table/hash.h
+++ b/be/src/vec/common/hash_table/hash.h
@@ -22,6 +22,7 @@
 
 #include <type_traits>
 
+#include "parallel_hashmap/phmap_utils.h"
 #include "vec/common/uint128.h"
 #include "vec/core/types.h"
 
@@ -140,6 +141,11 @@ DEFINE_HASH(doris::vectorized::Float64)
 
 #undef DEFINE_HASH
 
+template <typename Key, typename Hash = HashCRC32<Key>>
+struct HashMixWrapper {
+    size_t operator()(Key key) const { return phmap::phmap_mix<sizeof(size_t)>()(Hash()(key)); }
+};
+
 template <>
 struct HashCRC32<doris::vectorized::UInt256> {
     size_t operator()(const doris::vectorized::UInt256& x) const {
diff --git a/be/src/vec/common/hash_table/hash_table_utils.h b/be/src/vec/common/hash_table/hash_table_utils.h
new file mode 100644
index 0000000000..e437f07099
--- /dev/null
+++ b/be/src/vec/common/hash_table/hash_table_utils.h
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/HashTable.h
+// and modified by Doris
+
+template <typename T>
+struct HashTableTraits {
+    static constexpr bool is_phmap = false;
+    static constexpr bool is_parallel_phmap = false;
+};
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h
index 2883f58f17..5f06f6a09f 100644
--- a/be/src/vec/common/hash_table/ph_hash_map.h
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -19,26 +19,28 @@
 
 #include <parallel_hashmap/phmap.h>
 
-ALWAYS_INLINE inline char** lookup_result_get_mapped(std::pair<const StringRef, char*>* it) {
+#include "vec/common/hash_table/hash.h"
+#include "vec/common/hash_table/hash_table_utils.h"
+
+template <typename Key, typename Mapped>
+ALWAYS_INLINE inline auto lookup_result_get_mapped(std::pair<const Key, Mapped>* it) {
     return &(it->second);
 }
 
-template <typename Map>
-struct IsPhmapTraits {
-    constexpr static bool value = false;
-};
-
-template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+          bool use_parallel = false>
 class PHHashMap : private boost::noncopyable {
 public:
     using Self = PHHashMap;
-    using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>;
+    using HashMapImpl =
+            std::conditional_t<use_parallel, phmap::parallel_flat_hash_map<Key, Mapped, Hash>,
+                               phmap::flat_hash_map<Key, Mapped, Hash>>;
 
     using key_type = Key;
     using mapped_type = Mapped;
-    using value_type = Key;
+    using value_type = std::pair<const Key, Mapped>;
 
-    using LookupResult = typename HashMapImpl::value_type*;
+    using LookupResult = std::pair<const Key, Mapped>*;
 
     using const_iterator_impl = typename HashMapImpl::const_iterator;
     using iterator_impl = typename HashMapImpl::iterator;
@@ -121,17 +123,30 @@ public:
                                bool& inserted) {
         const auto& key = key_holder_get_key(key_holder);
         inserted = false;
-        auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
-            inserted = true;
-            key_holder_persist_key(key_holder);
-            ctor(key, nullptr);
-        });
-        it = &*it_;
+        if constexpr (use_parallel) {
+            auto it_ = _hash_map.lazy_emplace_with_hash(hash_value, key, [&](const auto& ctor) {
+                inserted = true;
+                key_holder_persist_key(key_holder);
+                ctor(key, nullptr);
+            });
+            it = &*it_;
+        } else {
+            auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
+                inserted = true;
+                key_holder_persist_key(key_holder);
+                ctor(key, nullptr);
+            });
+            it = &*it_;
+        }
     }
 
     size_t hash(const Key& x) const { return _hash_map.hash(x); }
 
-    void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); }
+    void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
+        if constexpr (!use_parallel) _hash_map.prefetch_hash(hash_value);
+    }
+
+    void ALWAYS_INLINE prefetch_by_key(Key key) { _hash_map.prefetch(key); }
 
     /// Call func(const Key &, Mapped &) for each hash map element.
     template <typename Func>
@@ -164,7 +179,8 @@ public:
     HashMapImpl _hash_map;
 };
 
-template <typename Key, typename Mapped, typename Hash>
-struct IsPhmapTraits<PHHashMap<Key, Mapped, Hash>> {
-    constexpr static bool value = true;
+template <typename Key, typename Mapped, typename Hash, bool use_parallel>
+struct HashTableTraits<PHHashMap<Key, Mapped, Hash, use_parallel>> {
+    static constexpr bool is_phmap = true;
+    static constexpr bool is_parallel_phmap = use_parallel;
 };
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 3301a7ed32..e7008957eb 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -142,16 +142,28 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
-            _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable);
+            if (_is_merge)
+                _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+            else
+                _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable);
             return;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
         case TYPE_DATE:
         case TYPE_DATETIME:
         case TYPE_DATETIMEV2:
-            _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable);
+            if (_is_merge)
+                _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+            else
+                _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable);
             return;
-        case TYPE_LARGEINT:
+        case TYPE_LARGEINT: {
+            if (_is_merge)
+                _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+            else
+                _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable);
+            return;
+        }
         case TYPE_DECIMALV2:
         case TYPE_DECIMAL32:
         case TYPE_DECIMAL64:
@@ -163,11 +175,20 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
                                         : type_ptr->get_type_id();
             WhichDataType which(idx);
             if (which.is_decimal32()) {
-                _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable);
+                if (_is_merge)
+                    _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+                else
+                    _agg_data.init(AggregatedDataVariants::Type::int32_key, is_nullable);
             } else if (which.is_decimal64()) {
-                _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable);
+                if (_is_merge)
+                    _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+                else
+                    _agg_data.init(AggregatedDataVariants::Type::int64_key, is_nullable);
             } else {
-                _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable);
+                if (_is_merge)
+                    _agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
+                else
+                    _agg_data.init(AggregatedDataVariants::Type::int128_key, is_nullable);
             }
             return;
         }
@@ -208,20 +229,38 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         if (use_fixed_key) {
             if (has_null) {
                 if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) {
-                    _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
                 } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
                            sizeof(UInt128)) {
-                    _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
                 } else {
-                    _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
                 }
             } else {
                 if (key_byte_size <= sizeof(UInt64)) {
-                    _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
                 } else if (key_byte_size <= sizeof(UInt128)) {
-                    _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
                 } else {
-                    _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                    if (_is_merge)
+                        _agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
+                    else
+                        _agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
                 }
             }
         } else {
@@ -794,10 +833,18 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                     std::vector<size_t> hash_values;
 
-                    if constexpr (IsPhmapTraits<HashTableType>::value) {
+                    if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                         if (hash_values.size() < rows) hash_values.resize(rows);
-                        for (size_t i = 0; i < rows; ++i) {
-                            hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                        if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                              AggState>::value) {
+                            for (size_t i = 0; i < rows; ++i) {
+                                hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                            }
+                        } else {
+                            for (size_t i = 0; i < rows; ++i) {
+                                hash_values[i] = agg_method.data.hash(
+                                        state.get_key_holder(i, _agg_arena_pool));
+                            }
                         }
                     }
 
@@ -806,10 +853,15 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                         AggregateDataPtr aggregate_data = nullptr;
 
                         auto emplace_result = [&]() {
-                            if constexpr (IsPhmapTraits<HashTableType>::value) {
+                            if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                                 if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
-                                    agg_method.data.prefetch_by_hash(
-                                            hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                                    if constexpr (HashTableTraits<
+                                                          HashTableType>::is_parallel_phmap) {
+                                        agg_method.data.prefetch_by_key(state.get_key_holder(
+                                                i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool));
+                                    } else
+                                        agg_method.data.prefetch_by_hash(
+                                                hash_values[i + HASH_MAP_PREFETCH_DIST]);
                                 }
 
                                 return state.emplace_key(agg_method.data, hash_values[i], i,
@@ -879,10 +931,18 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
 
                 std::vector<size_t> hash_values;
 
-                if constexpr (IsPhmapTraits<HashTableType>::value) {
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                     if (hash_values.size() < rows) hash_values.resize(rows);
-                    for (size_t i = 0; i < rows; ++i) {
-                        hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                    if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < rows; ++i) {
+                            hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < rows; ++i) {
+                            hash_values[i] =
+                                    agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+                        }
                     }
                 }
 
@@ -891,10 +951,14 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
                     AggregateDataPtr aggregate_data = nullptr;
 
                     auto emplace_result = [&]() {
-                        if constexpr (IsPhmapTraits<HashTableType>::value) {
+                        if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                             if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
-                                agg_method.data.prefetch_by_hash(
-                                        hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                                if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) {
+                                    agg_method.data.prefetch_by_key(state.get_key_holder(
+                                            i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool));
+                                } else
+                                    agg_method.data.prefetch_by_hash(
+                                            hash_values[i + HASH_MAP_PREFETCH_DIST]);
                             }
 
                             return state.emplace_key(agg_method.data, hash_values[i], i,
@@ -1141,10 +1205,18 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
 
                 std::vector<size_t> hash_values;
 
-                if constexpr (IsPhmapTraits<HashTableType>::value) {
+                if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                     if (hash_values.size() < rows) hash_values.resize(rows);
-                    for (size_t i = 0; i < rows; ++i) {
-                        hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                    if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
+                                          AggState>::value) {
+                        for (size_t i = 0; i < rows; ++i) {
+                            hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    } else {
+                        for (size_t i = 0; i < rows; ++i) {
+                            hash_values[i] =
+                                    agg_method.data.hash(state.get_key_holder(i, _agg_arena_pool));
+                        }
                     }
                 }
 
@@ -1153,12 +1225,15 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
                     AggregateDataPtr aggregate_data = nullptr;
 
                     auto emplace_result = [&]() {
-                        if constexpr (IsPhmapTraits<HashTableType>::value) {
+                        if constexpr (HashTableTraits<HashTableType>::is_phmap) {
                             if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
-                                agg_method.data.prefetch_by_hash(
-                                        hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                                if constexpr (HashTableTraits<HashTableType>::is_parallel_phmap) {
+                                    agg_method.data.prefetch_by_key(state.get_key_holder(
+                                            i + HASH_MAP_PREFETCH_DIST, _agg_arena_pool));
+                                } else
+                                    agg_method.data.prefetch_by_hash(
+                                            hash_values[i + HASH_MAP_PREFETCH_DIST]);
                             }
-
                             return state.emplace_key(agg_method.data, hash_values[i], i,
                                                      _agg_arena_pool);
                         } else {
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index fd6b67f255..53eecb025f 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -157,7 +157,7 @@ struct AggregationMethodStringNoCache {
 
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
-template <typename FieldType, typename TData, bool consecutive_keys_optimization = true>
+template <typename FieldType, typename TData, bool consecutive_keys_optimization = false>
 struct AggregationMethodOneNumber {
     using Data = TData;
     using Key = typename Data::key_type;
@@ -247,7 +247,7 @@ struct AggregationMethodKeysFixed {
     AggregationMethodKeysFixed(const Other& other) : data(other.data) {}
 
     using State = ColumnsHashing::HashMethodKeysFixed<typename Data::value_type, Key, Mapped,
-                                                      has_nullable_keys>;
+                                                      has_nullable_keys, false>;
 
     static void insert_key_into_columns(const Key& key, MutableColumns& key_columns,
                                         const Sizes& key_sizes) {
@@ -356,38 +356,61 @@ struct AggregationMethodSingleNullableColumn : public SingleColumnMethod {
 using AggregatedDataWithUInt8Key =
         FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
 using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>;
-using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
-using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
-using AggregatedDataWithUInt128Key = HashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>;
-using AggregatedDataWithUInt256Key = HashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>;
+using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
+using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
+using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>;
+using AggregatedDataWithUInt256Key = PHHashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>;
+using AggregatedDataWithUInt32KeyPhase2 =
+        PHHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>;
+using AggregatedDataWithUInt64KeyPhase2 =
+        PHHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>;
+using AggregatedDataWithUInt128KeyPhase2 =
+        PHHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>;
+using AggregatedDataWithUInt256KeyPhase2 =
+        PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>;
 
 using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
 using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
 using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey<AggregatedDataWithUInt32Key>;
 using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
+using AggregatedDataWithNullableUInt32KeyPhase2 =
+        AggregationDataWithNullKey<AggregatedDataWithUInt32KeyPhase2>;
+using AggregatedDataWithNullableUInt64KeyPhase2 =
+        AggregationDataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
 using AggregatedDataWithNullableShortStringKey =
         AggregationDataWithNullKey<AggregatedDataWithShortStringKey>;
 using AggregatedDataWithNullableUInt128Key =
         AggregationDataWithNullKey<AggregatedDataWithUInt128Key>;
+using AggregatedDataWithNullableUInt128KeyPhase2 =
+        AggregationDataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
 
 using AggregatedMethodVariants = std::variant<
         AggregationMethodSerialized<AggregatedDataWithStringKey>,
-        AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>,
-        AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>,
+        AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>,
+        AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>,
         AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>,
         AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>,
         AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>,
         AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>,
+        AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>,
+        AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>,
+        AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>,
         AggregationMethodSingleNullableColumn<
-                AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>,
+                AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>,
         AggregationMethodSingleNullableColumn<
-                AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>,
+                AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>,
         AggregationMethodSingleNullableColumn<
                 AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>,
         AggregationMethodSingleNullableColumn<
                 AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>,
+        AggregationMethodSingleNullableColumn<
+                AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>,
+        AggregationMethodSingleNullableColumn<
+                AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>,
         AggregationMethodSingleNullableColumn<
                 AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>,
+        AggregationMethodSingleNullableColumn<
+                AggregationMethodOneNumber<UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>,
         AggregationMethodSingleNullableColumn<
                 AggregationMethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
         AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>,
@@ -395,7 +418,13 @@ using AggregatedMethodVariants = std::variant<
         AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>,
         AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, true>,
         AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>,
-        AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>>;
+        AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, true>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
+        AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>;
 
 struct AggregatedDataVariants {
     AggregatedDataVariants() = default;
@@ -412,11 +441,17 @@ struct AggregatedDataVariants {
         int8_key,
         int16_key,
         int32_key,
+        int32_key_phase2,
         int64_key,
+        int64_key_phase2,
         int128_key,
+        int128_key_phase2,
         int64_keys,
+        int64_keys_phase2,
         int128_keys,
+        int128_keys_phase2,
         int256_keys,
+        int256_keys_phase2,
         string_key,
     };
 
@@ -433,22 +468,20 @@ struct AggregatedDataVariants {
             break;
         case Type::int8_key:
             if (is_nullable) {
-                _aggregated_method_variant
-                        .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
-                                UInt8, AggregatedDataWithNullableUInt8Key, false>>>();
+                _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
+                        AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>>();
             } else {
-                _aggregated_method_variant.emplace<
-                        AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>>();
+                _aggregated_method_variant
+                        .emplace<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>();
             }
             break;
         case Type::int16_key:
             if (is_nullable) {
-                _aggregated_method_variant
-                        .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
-                                UInt16, AggregatedDataWithNullableUInt16Key, false>>>();
+                _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
+                        AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>>();
             } else {
-                _aggregated_method_variant.emplace<
-                        AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>>();
+                _aggregated_method_variant
+                        .emplace<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>();
             }
             break;
         case Type::int32_key:
@@ -460,6 +493,16 @@ struct AggregatedDataVariants {
                         .emplace<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32Key>>();
             }
             break;
+        case Type::int32_key_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant
+                        .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
+                                UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>>();
+            }
+            break;
         case Type::int64_key:
             if (is_nullable) {
                 _aggregated_method_variant.emplace<AggregationMethodSingleNullableColumn<
@@ -469,6 +512,16 @@ struct AggregatedDataVariants {
                         .emplace<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>();
             }
             break;
+        case Type::int64_key_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant
+                        .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
+                                UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>>();
+            }
+            break;
         case Type::int128_key:
             if (is_nullable) {
                 _aggregated_method_variant
@@ -479,6 +532,16 @@ struct AggregatedDataVariants {
                         AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128Key>>();
             }
             break;
+        case Type::int128_key_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant
+                        .emplace<AggregationMethodSingleNullableColumn<AggregationMethodOneNumber<
+                                UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>>();
+            }
+            break;
         case Type::int64_keys:
             if (is_nullable) {
                 _aggregated_method_variant
@@ -488,6 +551,15 @@ struct AggregatedDataVariants {
                         .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key, false>>();
             }
             break;
+        case Type::int64_keys_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>>();
+            }
+            break;
         case Type::int128_keys:
             if (is_nullable) {
                 _aggregated_method_variant
@@ -497,6 +569,15 @@ struct AggregatedDataVariants {
                         .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt128Key, false>>();
             }
             break;
+        case Type::int128_keys_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>>();
+            }
+            break;
         case Type::int256_keys:
             if (is_nullable) {
                 _aggregated_method_variant
@@ -506,6 +587,15 @@ struct AggregatedDataVariants {
                         .emplace<AggregationMethodKeysFixed<AggregatedDataWithUInt256Key, false>>();
             }
             break;
+        case Type::int256_keys_phase2:
+            if (is_nullable) {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>>();
+            } else {
+                _aggregated_method_variant.emplace<
+                        AggregationMethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>>();
+            }
+            break;
         case Type::string_key:
             if (is_nullable) {
                 _aggregated_method_variant.emplace<


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org