You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2022/10/21 01:59:40 UTC

[doris] branch master updated: [Improvement](join) remove unnecessary state for join (#13472)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f65aa746 [Improvement](join) remove unnecessary state for join (#13472)
d3f65aa746 is described below

commit d3f65aa746c7ab27c72a39b8d4869dd6f83979ba
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Oct 21 09:59:34 2022 +0800

    [Improvement](join) remove unnecessary state for join (#13472)
---
 be/src/vec/exec/join/join_op.h           | 243 +++++---
 be/src/vec/exec/join/vhash_join_node.cpp | 947 +++++++++++++++++--------------
 be/src/vec/exec/join/vhash_join_node.h   | 116 ++--
 be/src/vec/exec/vset_operation_node.cpp  |  32 +-
 be/src/vec/exec/vset_operation_node.h    | 133 +++--
 5 files changed, 858 insertions(+), 613 deletions(-)

diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index 5c50238592..c760e6da9a 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -22,111 +22,210 @@
 #include "vec/core/block.h"
 
 namespace doris::vectorized {
-/// Reference to the row in block.
+/**
+ * Now we have different kinds of RowRef for join operation. Overall, RowRef is the base class and
+ * the class inheritance is below:
+ *                                         RowRef
+ *                                           |
+ *          ---------------------------------------------------------
+ *          |                          |                            |
+ *   RowRefListWithFlag           RowRefList                 RowRefWithFlag
+ *                                                                 |
+ *                                                         RowRefListWithFlags
+ *
+ *  RowRef is a basic representation for a row which contains only row_num and block_offset.
+ *
+ *  RowRefList is a list of many RowRefs. It used for join operations which doesn't need any flags to represent whether a row has already been visited.
+ *
+ *  RowRefListWithFlag is a list of many RowRefs and an extra visited flag. It used for join operations which all RowRefs in a list has the same visited flag.
+ *
+ *  RowRefWithFlag is a basic representation for a row with an extra visited flag.
+ *
+ *  RowRefListWithFlags is a list of many RowRefWithFlags. This means each row will have different visited flags. It's used for join operation which has `other_conjuncts`.
+ */
 struct RowRef {
     using SizeT = uint32_t; /// Do not use size_t cause of memory economy
 
     SizeT row_num = 0;
     uint8_t block_offset;
-    // Use in right join to mark row is visited
-    // TODO: opt the variable to use it only need
-    bool visited = false;
 
-    RowRef() {}
-    RowRef(size_t row_num_count, uint8_t block_offset_, bool is_visited = false)
-            : row_num(row_num_count), block_offset(block_offset_), visited(is_visited) {}
+    RowRef() = default;
+    RowRef(size_t row_num_count, uint8_t block_offset_)
+            : row_num(row_num_count), block_offset(block_offset_) {}
 };
 
-/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
-struct RowRefList : RowRef {
-    /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
-    struct Batch {
-        static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31.
+struct RowRefWithFlag : public RowRef {
+    bool visited;
 
-        SizeT size = 0; /// It's smaller than size_t but keeps align in Arena.
-        Batch* next;
-        RowRef row_refs[MAX_SIZE];
+    RowRefWithFlag() = default;
+    RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool is_visited = false)
+            : RowRef(row_num_count, block_offset_), visited(is_visited) {}
+};
 
-        Batch(Batch* parent) : next(parent) {}
+/// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
+template <typename RowRefType>
+struct Batch {
+    static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31.
 
-        bool full() const { return size == MAX_SIZE; }
+    RowRef::SizeT size = 0; /// It's smaller than size_t but keeps align in Arena.
+    Batch<RowRefType>* next;
+    RowRefType row_refs[MAX_SIZE];
 
-        Batch* insert(RowRef&& row_ref, Arena& pool) {
-            if (full()) {
-                auto batch = pool.alloc<Batch>();
-                *batch = Batch(this);
-                batch->insert(std::move(row_ref), pool);
-                return batch;
-            }
+    Batch(Batch<RowRefType>* parent) : next(parent) {}
+
+    bool full() const { return size == MAX_SIZE; }
 
-            row_refs[size++] = std::move(row_ref);
-            return this;
+    Batch<RowRefType>* insert(RowRefType&& row_ref, Arena& pool) {
+        if (full()) {
+            auto batch = pool.alloc<Batch<RowRefType>>();
+            *batch = Batch<RowRefType>(this);
+            batch->insert(std::move(row_ref), pool);
+            return batch;
         }
-    };
 
-    class ForwardIterator {
-    public:
-        ForwardIterator(RowRefList* begin)
-                : root(begin), first(true), batch(root->next), position(0) {}
+        row_refs[size++] = std::move(row_ref);
+        return this;
+    }
+};
 
-        RowRef& operator*() {
-            if (first) return *root;
-            return batch->row_refs[position];
-        }
-        RowRef* operator->() { return &(**this); }
+template <typename RowRefListType>
+class ForwardIterator {
+public:
+    using RowRefType = typename RowRefListType::RowRefType;
+    ForwardIterator(RowRefListType* begin)
+            : root(begin), first(true), batch(root->next), position(0) {}
 
-        bool operator==(const ForwardIterator& rhs) const {
-            if (ok() != rhs.ok()) {
-                return false;
-            }
-            if (first && rhs.first) {
-                return true;
-            }
-            return batch == rhs.batch && position == rhs.position;
+    RowRefType& operator*() {
+        if (first) return *root;
+        return batch->row_refs[position];
+    }
+    RowRefType* operator->() { return &(**this); }
+
+    bool operator==(const ForwardIterator<RowRefListType>& rhs) const {
+        if (ok() != rhs.ok()) {
+            return false;
+        }
+        if (first && rhs.first) {
+            return true;
         }
-        bool operator!=(const ForwardIterator& rhs) const { return !(*this == rhs); }
+        return batch == rhs.batch && position == rhs.position;
+    }
+    bool operator!=(const ForwardIterator<RowRefListType>& rhs) const { return !(*this == rhs); }
 
-        void operator++() {
-            if (first) {
-                first = false;
-                return;
-            }
+    void operator++() {
+        if (first) {
+            first = false;
+            return;
+        }
 
-            if (batch) {
-                ++position;
-                if (position >= batch->size) {
-                    batch = batch->next;
-                    position = 0;
-                }
+        if (batch) {
+            ++position;
+            if (position >= batch->size) {
+                batch = batch->next;
+                position = 0;
             }
         }
+    }
 
-        bool ok() const { return first || batch; }
+    bool ok() const { return first || batch; }
 
-        static ForwardIterator end() { return ForwardIterator(); }
+    static ForwardIterator<RowRefListType> end() { return ForwardIterator(); }
 
-    private:
-        RowRefList* root;
-        bool first;
-        Batch* batch;
-        size_t position;
+private:
+    RowRefListType* root;
+    bool first;
+    Batch<RowRefType>* batch;
+    size_t position;
+
+    ForwardIterator() : root(nullptr), first(false), batch(nullptr), position(0) {}
+};
 
-        ForwardIterator() : root(nullptr), first(false), batch(nullptr), position(0) {}
-    };
+struct RowRefList : RowRef {
+    using RowRefType = RowRef;
 
-    RowRefList() {}
+    RowRefList() = default;
     RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {}
 
-    ForwardIterator begin() { return ForwardIterator(this); }
-    static ForwardIterator end() { return ForwardIterator::end(); }
+    ForwardIterator<RowRefList> begin() { return ForwardIterator<RowRefList>(this); }
+    static ForwardIterator<RowRefList> end() { return ForwardIterator<RowRefList>::end(); }
+
+    /// insert element after current one
+    void insert(RowRefType&& row_ref, Arena& pool) {
+        row_count++;
+
+        if (!next) {
+            next = pool.alloc<Batch<RowRefType>>();
+            *next = Batch<RowRefType>(nullptr);
+        }
+        next = next->insert(std::move(row_ref), pool);
+    }
+
+    uint32_t get_row_count() { return row_count; }
+
+private:
+    friend class ForwardIterator<RowRefList>;
+
+    Batch<RowRefType>* next = nullptr;
+    uint32_t row_count = 1;
+};
+
+struct RowRefListWithFlag : RowRef {
+    using RowRefType = RowRef;
+
+    RowRefListWithFlag() = default;
+    RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, block_offset_) {}
+
+    ForwardIterator<RowRefListWithFlag> begin() {
+        return ForwardIterator<RowRefListWithFlag>(this);
+    }
+
+    static ForwardIterator<RowRefListWithFlag> end() {
+        return ForwardIterator<RowRefListWithFlag>::end();
+    }
 
     /// insert element after current one
     void insert(RowRef&& row_ref, Arena& pool) {
         row_count++;
 
         if (!next) {
-            next = pool.alloc<Batch>();
-            *next = Batch(nullptr);
+            next = pool.alloc<Batch<RowRefType>>();
+            *next = Batch<RowRefType>(nullptr);
+        }
+        next = next->insert(std::move(row_ref), pool);
+    }
+
+    uint32_t get_row_count() { return row_count; }
+
+    bool visited = false;
+
+private:
+    friend class ForwardIterator<RowRefListWithFlag>;
+
+    Batch<RowRefType>* next = nullptr;
+    uint32_t row_count = 1;
+};
+
+struct RowRefListWithFlags : RowRefWithFlag {
+    using RowRefType = RowRefWithFlag;
+
+    RowRefListWithFlags() = default;
+    RowRefListWithFlags(size_t row_num_, uint8_t block_offset_)
+            : RowRefWithFlag(row_num_, block_offset_) {}
+
+    ForwardIterator<RowRefListWithFlags> begin() {
+        return ForwardIterator<RowRefListWithFlags>(this);
+    }
+    static ForwardIterator<RowRefListWithFlags> end() {
+        return ForwardIterator<RowRefListWithFlags>::end();
+    }
+
+    /// insert element after current one
+    void insert(RowRefWithFlag&& row_ref, Arena& pool) {
+        row_count++;
+
+        if (!next) {
+            next = pool.alloc<Batch<RowRefType>>();
+            *next = Batch<RowRefType>(nullptr);
         }
         next = next->insert(std::move(row_ref), pool);
     }
@@ -134,7 +233,9 @@ struct RowRefList : RowRef {
     uint32_t get_row_count() { return row_count; }
 
 private:
-    Batch* next = nullptr;
+    friend class ForwardIterator<RowRefListWithFlags>;
+
+    Batch<RowRefType>* next = nullptr;
     uint32_t row_count = 1;
 };
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 8126e7732d..fad0eb68cf 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -34,6 +34,16 @@ namespace doris::vectorized {
 //  SQL hint to allow users to tune by hand.
 static constexpr int PREFETCH_STEP = 64;
 
+template Status HashJoinNode::_extract_join_column<true>(
+        Block&, COW<IColumn>::mutable_ptr<ColumnVector<unsigned char>>&,
+        std::vector<IColumn const*, std::allocator<IColumn const*>>&,
+        std::vector<int, std::allocator<int>> const&);
+
+template Status HashJoinNode::_extract_join_column<false>(
+        Block&, COW<IColumn>::mutable_ptr<ColumnVector<unsigned char>>&,
+        std::vector<IColumn const*, std::allocator<IColumn const*>>&,
+        std::vector<int, std::allocator<int>> const&);
+
 using ProfileCounter = RuntimeProfile::Counter;
 template <class HashTableContext>
 struct ProcessHashTableBuild {
@@ -48,7 +58,8 @@ struct ProcessHashTableBuild {
               _offset(offset),
               _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}
 
-    template <bool ignore_null, bool build_unique, bool has_runtime_filter>
+    template <bool need_null_map_for_build, bool ignore_null, bool build_unique,
+              bool has_runtime_filter>
     void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map) {
         using KeyGetter = typename HashTableContext::State;
         using Mapped = typename HashTableContext::Mapped;
@@ -81,7 +92,7 @@ struct ProcessHashTableBuild {
         {
             SCOPED_TIMER(_build_side_compute_hash_timer);
             for (size_t k = 0; k < _rows; ++k) {
-                if constexpr (ignore_null) {
+                if constexpr (ignore_null && need_null_map_for_build) {
                     if ((*null_map)[k]) {
                         continue;
                     }
@@ -97,7 +108,7 @@ struct ProcessHashTableBuild {
         }
 
         for (size_t k = 0; k < _rows; ++k) {
-            if constexpr (ignore_null) {
+            if constexpr (ignore_null && need_null_map_for_build) {
                 if ((*null_map)[k]) {
                     continue;
                 }
@@ -132,15 +143,6 @@ struct ProcessHashTableBuild {
                        hash_table_ctx.hash_table.get_resize_timer_value());
     }
 
-    template <bool ignore_null, bool build_unique, bool has_runtime_filter>
-    struct Reducer {
-        template <typename... TArgs>
-        static void run(ProcessHashTableBuild<HashTableContext>& build, TArgs&&... args) {
-            build.template run<ignore_null, build_unique, has_runtime_filter>(
-                    std::forward<TArgs>(args)...);
-        }
-    };
-
 private:
     const int _rows;
     int _skip_rows;
@@ -308,7 +310,7 @@ void ProcessHashTableProbe<JoinOpType, ignore_null>::probe_side_output_column(
 }
 
 template <class JoinOpType, bool ignore_null>
-template <typename HashTableType>
+template <bool need_null_map_for_probe, typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType& hash_table_ctx,
                                                                   ConstNullMapPtr null_map,
                                                                   MutableBlock& mutable_block,
@@ -335,11 +337,6 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
     auto& mcol = mutable_block.mutable_columns();
     int current_offset = 0;
 
-    constexpr auto need_to_set_visited = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
-                                         JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
-                                         JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
-                                         JoinOpType::value == TJoinOp::FULL_OUTER_JOIN;
-
     constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
                                              JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN;
 
@@ -351,7 +348,7 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
     {
         SCOPED_TIMER(_search_hashtable_timer);
         while (probe_index < probe_rows) {
-            if constexpr (ignore_null) {
+            if constexpr (ignore_null && need_null_map_for_probe) {
                 if ((*null_map)[probe_index]) {
                     _items_counts[probe_index++] = (uint32_t)0;
                     all_match_one = false;
@@ -360,7 +357,9 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
             }
             int last_offset = current_offset;
             auto find_result =
-                    (*null_map)[probe_index]
+                    !need_null_map_for_probe
+                            ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena)
+                    : (*null_map)[probe_index]
                             ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
                                                            _arena)) {nullptr, false}
                             : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena);
@@ -382,7 +381,9 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
                     // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
                     // We should rethink whether to use this iterator mode in the future. Now just opt the one row case
                     if (mapped.get_row_count() == 1) {
-                        if constexpr (need_to_set_visited) mapped.visited = true;
+                        if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
+                            mapped.visited = true;
+                        }
 
                         if constexpr (!is_right_semi_anti_join) {
                             if (LIKELY(current_offset < _build_block_rows.size())) {
@@ -406,7 +407,9 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
                                 }
                                 ++current_offset;
                             }
-                            if constexpr (need_to_set_visited) it->visited = true;
+                        }
+                        if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
+                            mapped.visited = true;
                         }
                     }
                 } else {
@@ -452,8 +455,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
 }
 
 template <class JoinOpType, bool ignore_null>
-template <typename HashTableType>
-Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_join_conjunts(
+template <bool need_null_map_for_probe, typename HashTableType>
+Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_join_conjuncts(
         HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block,
         Block* output_block, size_t probe_rows) {
     auto& probe_index = _join_node->_probe_index;
@@ -468,232 +471,246 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
 
     using KeyGetter = typename HashTableType::State;
     using Mapped = typename HashTableType::Mapped;
-    KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr);
+    if constexpr (std::is_same_v<Mapped, RowRefListWithFlags>) {
+        KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr);
 
-    int right_col_idx = _join_node->_left_table_data_types.size();
-    int right_col_len = _join_node->_right_table_data_types.size();
+        int right_col_idx = _join_node->_left_table_data_types.size();
+        int right_col_len = _join_node->_right_table_data_types.size();
 
-    auto& mcol = mutable_block.mutable_columns();
-    // use in right join to change visited state after
-    // exec the vother join conjunt
-    std::vector<bool*> visited_map;
-    visited_map.reserve(1.2 * _batch_size);
+        auto& mcol = mutable_block.mutable_columns();
+        // use in right join to change visited state after
+        // exec the vother join conjunt
+        std::vector<bool*> visited_map;
+        visited_map.reserve(1.2 * _batch_size);
 
-    std::vector<bool> same_to_prev;
-    same_to_prev.reserve(1.2 * _batch_size);
+        std::vector<bool> same_to_prev;
+        same_to_prev.reserve(1.2 * _batch_size);
 
-    int current_offset = 0;
+        int current_offset = 0;
 
-    bool all_match_one = true;
-    int last_probe_index = probe_index;
-    while (probe_index < probe_rows) {
-        // ignore null rows
-        if constexpr (ignore_null) {
-            if ((*null_map)[probe_index]) {
-                _items_counts[probe_index++] = (uint32_t)0;
-                continue;
+        bool all_match_one = true;
+        int last_probe_index = probe_index;
+        while (probe_index < probe_rows) {
+            // ignore null rows
+            if constexpr (ignore_null && need_null_map_for_probe) {
+                if ((*null_map)[probe_index]) {
+                    _items_counts[probe_index++] = (uint32_t)0;
+                    continue;
+                }
             }
-        }
 
-        auto last_offset = current_offset;
-        auto find_result =
-                (*null_map)[probe_index]
-                        ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
-                                                       _arena)) {nullptr, false}
-                        : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena);
-        if (probe_index + PREFETCH_STEP < probe_rows)
-            key_getter.template prefetch<true>(hash_table_ctx.hash_table,
-                                               probe_index + PREFETCH_STEP, _arena);
-        if (find_result.is_found()) {
-            auto& mapped = find_result.get_mapped();
-            auto origin_offset = current_offset;
-            // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
-            // We should rethink whether to use this iterator mode in the future. Now just opt the one row case
-            if (mapped.get_row_count() == 1) {
-                if (LIKELY(current_offset < _build_block_rows.size())) {
-                    _build_block_offsets[current_offset] = mapped.block_offset;
-                    _build_block_rows[current_offset] = mapped.row_num;
-                } else {
-                    _build_block_offsets.emplace_back(mapped.block_offset);
-                    _build_block_rows.emplace_back(mapped.row_num);
-                }
-                ++current_offset;
-                visited_map.emplace_back(&mapped.visited);
-            } else {
-                for (auto it = mapped.begin(); it.ok(); ++it) {
+            auto last_offset = current_offset;
+            auto find_result =
+                    !need_null_map_for_probe
+                            ? key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena)
+                    : (*null_map)[probe_index]
+                            ? decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
+                                                           _arena)) {nullptr, false}
+                            : key_getter.find_key(hash_table_ctx.hash_table, probe_index, _arena);
+            if (probe_index + PREFETCH_STEP < probe_rows)
+                key_getter.template prefetch<true>(hash_table_ctx.hash_table,
+                                                   probe_index + PREFETCH_STEP, _arena);
+            if (find_result.is_found()) {
+                auto& mapped = find_result.get_mapped();
+                auto origin_offset = current_offset;
+                // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance.
+                // We should rethink whether to use this iterator mode in the future. Now just opt the one row case
+                if (mapped.get_row_count() == 1) {
                     if (LIKELY(current_offset < _build_block_rows.size())) {
-                        _build_block_offsets[current_offset] = it->block_offset;
-                        _build_block_rows[current_offset] = it->row_num;
+                        _build_block_offsets[current_offset] = mapped.block_offset;
+                        _build_block_rows[current_offset] = mapped.row_num;
                     } else {
-                        _build_block_offsets.emplace_back(it->block_offset);
-                        _build_block_rows.emplace_back(it->row_num);
+                        _build_block_offsets.emplace_back(mapped.block_offset);
+                        _build_block_rows.emplace_back(mapped.row_num);
                     }
                     ++current_offset;
-                    visited_map.emplace_back(&it->visited);
+                    visited_map.emplace_back(&mapped.visited);
+                } else {
+                    for (auto it = mapped.begin(); it.ok(); ++it) {
+                        if (LIKELY(current_offset < _build_block_rows.size())) {
+                            _build_block_offsets[current_offset] = it->block_offset;
+                            _build_block_rows[current_offset] = it->row_num;
+                        } else {
+                            _build_block_offsets.emplace_back(it->block_offset);
+                            _build_block_rows.emplace_back(it->row_num);
+                        }
+                        ++current_offset;
+                        visited_map.emplace_back(&it->visited);
+                    }
                 }
-            }
-            same_to_prev.emplace_back(false);
-            for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
-                same_to_prev.emplace_back(true);
-            }
-        } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
-                             JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
-                             JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
-            same_to_prev.emplace_back(false);
-            visited_map.emplace_back(nullptr);
-            // only full outer / left outer need insert the data of right table
-            // left anti use -1 use a default value
-            if (LIKELY(current_offset < _build_block_rows.size())) {
-                _build_block_offsets[current_offset] = -1;
-                _build_block_rows[current_offset] = -1;
+                same_to_prev.emplace_back(false);
+                for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
+                    same_to_prev.emplace_back(true);
+                }
+            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
+                                 JoinOpType::value == TJoinOp::FULL_OUTER_JOIN ||
+                                 JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
+                same_to_prev.emplace_back(false);
+                visited_map.emplace_back(nullptr);
+                // only full outer / left outer need insert the data of right table
+                // left anti use -1 use a default value
+                if (LIKELY(current_offset < _build_block_rows.size())) {
+                    _build_block_offsets[current_offset] = -1;
+                    _build_block_rows[current_offset] = -1;
+                } else {
+                    _build_block_offsets.emplace_back(-1);
+                    _build_block_rows.emplace_back(-1);
+                }
+                ++current_offset;
             } else {
-                _build_block_offsets.emplace_back(-1);
-                _build_block_rows.emplace_back(-1);
+                // other join, no nothing
+            }
+            uint32_t count = (uint32_t)(current_offset - last_offset);
+            _items_counts[probe_index++] = count;
+            all_match_one &= (count == 1);
+            if (current_offset >= _batch_size && !all_match_one) {
+                break;
             }
-            ++current_offset;
-        } else {
-            // other join, no nothing
         }
 
-        uint32_t count = (uint32_t)(current_offset - last_offset);
-        _items_counts[probe_index++] = count;
-        all_match_one &= (count == 1);
-        if (current_offset >= _batch_size && !all_match_one) {
-            break;
+        {
+            SCOPED_TIMER(_build_side_output_timer);
+            build_side_output_column<true>(mcol, right_col_idx, right_col_len,
+                                           _join_node->_right_output_slot_flags, current_offset);
         }
-    }
-
-    {
-        SCOPED_TIMER(_build_side_output_timer);
-        build_side_output_column<true>(mcol, right_col_idx, right_col_len,
-                                       _join_node->_right_output_slot_flags, current_offset);
-    }
-    {
-        SCOPED_TIMER(_probe_side_output_timer);
-        probe_side_output_column<true>(mcol, _join_node->_left_output_slot_flags, current_offset,
-                                       last_probe_index, probe_index - last_probe_index,
-                                       all_match_one);
-    }
-    output_block->swap(mutable_block.to_block());
-
-    // dispose the other join conjunt exec
-    if (output_block->rows()) {
-        int result_column_id = -1;
-        int orig_columns = output_block->columns();
-        (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id);
+        {
+            SCOPED_TIMER(_probe_side_output_timer);
+            probe_side_output_column<true>(mcol, _join_node->_left_output_slot_flags,
+                                           current_offset, last_probe_index,
+                                           probe_index - last_probe_index, all_match_one);
+        }
+        output_block->swap(mutable_block.to_block());
+
+        // dispose the other join conjunt exec
+        if (output_block->rows()) {
+            int result_column_id = -1;
+            int orig_columns = output_block->columns();
+            (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id);
+
+            auto column = output_block->get_by_position(result_column_id).column;
+            if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
+                auto new_filter_column = ColumnVector<UInt8>::create();
+                auto& filter_map = new_filter_column->get_data();
+
+                auto null_map_column = ColumnVector<UInt8>::create(column->size(), 0);
+                auto* __restrict null_map_data = null_map_column->get_data().data();
+
+                for (int i = 0; i < column->size(); ++i) {
+                    auto join_hit = visited_map[i] != nullptr;
+                    auto other_hit = column->get_bool(i);
+
+                    if (!other_hit) {
+                        for (size_t j = 0; j < right_col_len; ++j) {
+                            typeid_cast<ColumnNullable*>(
+                                    std::move(*output_block->get_by_position(j + right_col_idx)
+                                                       .column)
+                                            .assume_mutable()
+                                            .get())
+                                    ->get_null_map_data()[i] = true;
+                        }
+                    }
+                    null_map_data[i] = !join_hit || !other_hit;
+
+                    if (join_hit) {
+                        *visited_map[i] |= other_hit;
+                        filter_map.push_back(other_hit || !same_to_prev[i] ||
+                                             (!column->get_bool(i - 1) && filter_map.back()));
+                        // Here to keep only hit join conjunt and other join conjunt is true need to be output.
+                        // if not, only some key must keep one row will output will null right table column
+                        if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1))
+                            filter_map[i - 1] = false;
+                    } else {
+                        filter_map.push_back(true);
+                    }
+                }
 
-        auto column = output_block->get_by_position(result_column_id).column;
-        if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
-                      JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
-            auto new_filter_column = ColumnVector<UInt8>::create();
-            auto& filter_map = new_filter_column->get_data();
-
-            auto null_map_column = ColumnVector<UInt8>::create(column->size(), 0);
-            auto* __restrict null_map_data = null_map_column->get_data().data();
-
-            for (int i = 0; i < column->size(); ++i) {
-                auto join_hit = visited_map[i] != nullptr;
-                auto other_hit = column->get_bool(i);
-
-                if (!other_hit) {
-                    for (size_t j = 0; j < right_col_len; ++j) {
-                        typeid_cast<ColumnNullable*>(
-                                std::move(*output_block->get_by_position(j + right_col_idx).column)
-                                        .assume_mutable()
-                                        .get())
-                                ->get_null_map_data()[i] = true;
+                for (int i = 0; i < column->size(); ++i) {
+                    if (filter_map[i]) {
+                        _tuple_is_null_right_flags->emplace_back(null_map_data[i]);
                     }
                 }
-                null_map_data[i] = !join_hit || !other_hit;
-
-                if (join_hit) {
-                    *visited_map[i] |= other_hit;
-                    filter_map.push_back(other_hit || !same_to_prev[i] ||
-                                         (!column->get_bool(i - 1) && filter_map.back()));
-                    // Here to keep only hit join conjunt and other join conjunt is true need to be output.
-                    // if not, only some key must keep one row will output will null right table column
-                    if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1))
-                        filter_map[i - 1] = false;
-                } else {
-                    filter_map.push_back(true);
+                output_block->get_by_position(result_column_id).column =
+                        std::move(new_filter_column);
+            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) {
+                auto new_filter_column = ColumnVector<UInt8>::create();
+                auto& filter_map = new_filter_column->get_data();
+
+                if (!column->empty()) {
+                    filter_map.emplace_back(column->get_bool(0));
+                }
+                for (int i = 1; i < column->size(); ++i) {
+                    if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) {
+                        // Only last same element is true, output last one
+                        filter_map.push_back(true);
+                        filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
+                    } else {
+                        filter_map.push_back(false);
+                    }
                 }
-            }
 
-            for (int i = 0; i < column->size(); ++i) {
-                if (filter_map[i]) {
-                    _tuple_is_null_right_flags->emplace_back(null_map_data[i]);
+                output_block->get_by_position(result_column_id).column =
+                        std::move(new_filter_column);
+            } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
+                auto new_filter_column = ColumnVector<UInt8>::create();
+                auto& filter_map = new_filter_column->get_data();
+
+                if (!column->empty()) {
+                    filter_map.emplace_back(column->get_bool(0) && visited_map[0]);
                 }
-            }
-            output_block->get_by_position(result_column_id).column = std::move(new_filter_column);
-        } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) {
-            auto new_filter_column = ColumnVector<UInt8>::create();
-            auto& filter_map = new_filter_column->get_data();
-
-            if (!column->empty()) filter_map.emplace_back(column->get_bool(0));
-            for (int i = 1; i < column->size(); ++i) {
-                if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) {
-                    // Only last same element is true, output last one
-                    filter_map.push_back(true);
-                    filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
-                } else {
-                    filter_map.push_back(false);
+                for (int i = 1; i < column->size(); ++i) {
+                    if ((visited_map[i] && column->get_bool(i)) ||
+                        (same_to_prev[i] && filter_map[i - 1])) {
+                        filter_map.push_back(true);
+                        filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
+                    } else {
+                        filter_map.push_back(false);
+                    }
                 }
-            }
 
-            output_block->get_by_position(result_column_id).column = std::move(new_filter_column);
-        } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
-            auto new_filter_column = ColumnVector<UInt8>::create();
-            auto& filter_map = new_filter_column->get_data();
-
-            if (!column->empty()) filter_map.emplace_back(column->get_bool(0) && visited_map[0]);
-            for (int i = 1; i < column->size(); ++i) {
-                if ((visited_map[i] && column->get_bool(i)) ||
-                    (same_to_prev[i] && filter_map[i - 1])) {
-                    filter_map.push_back(true);
-                    filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1];
-                } else {
-                    filter_map.push_back(false);
+                // Same to the semi join, but change the last value to opposite value
+                for (int i = 1; i < same_to_prev.size(); ++i) {
+                    if (!same_to_prev[i]) filter_map[i - 1] = !filter_map[i - 1];
+                }
+                filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1];
+
+                output_block->get_by_position(result_column_id).column =
+                        std::move(new_filter_column);
+            } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
+                                 JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
+                for (int i = 0; i < column->size(); ++i) {
+                    DCHECK(visited_map[i]);
+                    *visited_map[i] |= column->get_bool(i);
+                }
+            } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) {
+                auto filter_size = 0;
+                for (int i = 0; i < column->size(); ++i) {
+                    DCHECK(visited_map[i]);
+                    auto result = column->get_bool(i);
+                    *visited_map[i] |= result;
+                    filter_size += result;
                 }
+                _tuple_is_null_left_flags->resize_fill(filter_size, 0);
+            } else {
+                // inner join do nothing
             }
 
-            // Same to the semi join, but change the last value to opposite value
-            for (int i = 1; i < same_to_prev.size(); ++i) {
-                if (!same_to_prev[i]) filter_map[i - 1] = !filter_map[i - 1];
-            }
-            filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1];
-
-            output_block->get_by_position(result_column_id).column = std::move(new_filter_column);
-        } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
-                             JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
-            for (int i = 0; i < column->size(); ++i) {
-                DCHECK(visited_map[i]);
-                *visited_map[i] |= column->get_bool(i);
-            }
-        } else if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN) {
-            auto filter_size = 0;
-            for (int i = 0; i < column->size(); ++i) {
-                DCHECK(visited_map[i]);
-                auto result = column->get_bool(i);
-                *visited_map[i] |= result;
-                filter_size += result;
+            if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
+                          JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
+                output_block->clear();
+            } else {
+                if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN ||
+                              JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
+                    orig_columns = right_col_idx;
+                Block::filter_block(output_block, result_column_id, orig_columns);
             }
-            _tuple_is_null_left_flags->resize_fill(filter_size, 0);
-        } else {
-            // inner join do nothing
         }
 
-        if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
-                      JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN) {
-            output_block->clear();
-        } else {
-            if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN ||
-                          JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
-                orig_columns = right_col_idx;
-            Block::filter_block(output_block, result_column_id, orig_columns);
-        }
+        return Status::OK();
+    } else {
+        LOG(FATAL) << "Invalid RowRefList";
+        return Status::InvalidArgument("Invalid RowRefList");
     }
-
-    return Status::OK();
 }
 
 template <class JoinOpType, bool ignore_null>
@@ -701,57 +718,80 @@ template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType, ignore_null>::process_data_in_hashtable(
         HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block,
         bool* eos) {
-    hash_table_ctx.init_once();
-    auto& mcol = mutable_block.mutable_columns();
-
-    bool right_semi_anti_without_other =
-            _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct;
-    int right_col_idx =
-            right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size();
-    int right_col_len = _join_node->_right_table_data_types.size();
-
-    auto& iter = hash_table_ctx.iter;
-    auto block_size = 0;
-
-    auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
-        block_size++;
-        for (size_t j = 0; j < right_col_len; ++j) {
-            auto& column = *_build_blocks[offset].get_by_position(j).column;
-            mcol[j + right_col_idx]->insert_from(column, row_num);
-        }
-    };
-
-    for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) {
-        auto& mapped = iter->get_second();
-        for (auto it = mapped.begin(); it.ok(); ++it) {
-            if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) {
-                if (it->visited) insert_from_hash_table(it->block_offset, it->row_num);
+    using Mapped = typename HashTableType::Mapped;
+    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
+                  std::is_same_v<Mapped, RowRefListWithFlags>) {
+        hash_table_ctx.init_once();
+        auto& mcol = mutable_block.mutable_columns();
+
+        bool right_semi_anti_without_other =
+                _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct;
+        int right_col_idx =
+                right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size();
+        int right_col_len = _join_node->_right_table_data_types.size();
+
+        auto& iter = hash_table_ctx.iter;
+        auto block_size = 0;
+
+        auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
+            block_size++;
+            for (size_t j = 0; j < right_col_len; ++j) {
+                auto& column = *_build_blocks[offset].get_by_position(j).column;
+                mcol[j + right_col_idx]->insert_from(column, row_num);
+            }
+        };
+
+        for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) {
+            auto& mapped = iter->get_second();
+            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
+                if (mapped.visited) {
+                    for (auto it = mapped.begin(); it.ok(); ++it) {
+                        if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) {
+                            insert_from_hash_table(it->block_offset, it->row_num);
+                        }
+                    }
+                } else {
+                    for (auto it = mapped.begin(); it.ok(); ++it) {
+                        if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN) {
+                            insert_from_hash_table(it->block_offset, it->row_num);
+                        }
+                    }
+                }
             } else {
-                if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num);
+                for (auto it = mapped.begin(); it.ok(); ++it) {
+                    if constexpr (JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN) {
+                        if (it->visited) insert_from_hash_table(it->block_offset, it->row_num);
+                    } else {
+                        if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num);
+                    }
+                }
             }
         }
-    }
 
-    // just resize the left table column in case with other conjunct to make block size is not zero
-    if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) {
-        auto target_size = mcol[right_col_idx]->size();
-        for (int i = 0; i < right_col_idx; ++i) {
-            mcol[i]->resize(target_size);
+        // just resize the left table column in case with other conjunct to make block size is not zero
+        if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) {
+            auto target_size = mcol[right_col_idx]->size();
+            for (int i = 0; i < right_col_idx; ++i) {
+                mcol[i]->resize(target_size);
+            }
         }
-    }
 
-    // right outer join / full join need insert data of left table
-    if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
-                  JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
-        for (int i = 0; i < right_col_idx; ++i) {
-            assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
+        // right outer join / full join need insert data of left table
+        if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
+                      JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
+            for (int i = 0; i < right_col_idx; ++i) {
+                assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
+            }
+            _tuple_is_null_left_flags->resize_fill(block_size, 1);
         }
-        _tuple_is_null_left_flags->resize_fill(block_size, 1);
+        *eos = iter == hash_table_ctx.hash_table.end();
+        output_block->swap(
+                mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0));
+        return Status::OK();
+    } else {
+        LOG(FATAL) << "Invalid RowRefList";
+        return Status::InvalidArgument("Invalid RowRefList");
     }
-    *eos = iter == hash_table_ctx.hash_table.end();
-
-    output_block->swap(mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0));
-    return Status::OK();
 }
 
 HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -832,6 +872,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _probe_not_ignore_null.emplace_back(
                 null_aware ||
                 (_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null));
+        _build_side_ignore_null |= !_build_not_ignore_null.back();
     }
     for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
         _probe_ignore_null |= !_probe_not_ignore_null[i];
@@ -840,9 +881,9 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
     _probe_column_disguise_null.reserve(eq_join_conjuncts.size());
 
     if (tnode.hash_join_node.__isset.vother_join_conjunct) {
-        _vother_join_conjunct_ptr.reset(new doris::vectorized::VExprContext*);
-        RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(
-                _pool, tnode.hash_join_node.vother_join_conjunct, _vother_join_conjunct_ptr.get()));
+        _vother_join_conjunct_ptr.reset(new VExprContext*);
+        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, tnode.hash_join_node.vother_join_conjunct,
+                                                _vother_join_conjunct_ptr.get()));
 
         // If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate,
         // build table should not be deduplicated.
@@ -1003,19 +1044,29 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
 
             int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
             _probe_columns.resize(probe_expr_ctxs_sz);
-            if (_null_map_column == nullptr) {
-                _null_map_column = ColumnUInt8::create();
+
+            std::vector<int> res_col_ids(probe_expr_ctxs_sz);
+            RETURN_IF_ERROR(_do_evaluate(_probe_block, _probe_expr_ctxs, *_probe_expr_call_timer,
+                                         res_col_ids));
+            // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
+            //  so we have to initialize this flag by the first probe block.
+            if (!_has_set_need_null_map_for_probe) {
+                _has_set_need_null_map_for_probe = true;
+                _need_null_map_for_probe = _need_null_map<false>(_probe_block, res_col_ids);
+            }
+            if (_need_null_map_for_probe) {
+                if (_null_map_column == nullptr) {
+                    _null_map_column = ColumnUInt8::create();
+                }
+                _null_map_column->get_data().assign(probe_rows, (uint8_t)0);
             }
-            _null_map_column->get_data().assign(probe_rows, (uint8_t)0);
 
             Status st = std::visit(
                     [&](auto&& arg) -> Status {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                            auto& null_map_val = _null_map_column->get_data();
-                            return _extract_probe_join_column(_probe_block, null_map_val,
-                                                              _probe_columns,
-                                                              *_probe_expr_call_timer);
+                            return _extract_join_column<false>(_probe_block, _null_map_column,
+                                                               _probe_columns, res_col_ids);
                         } else {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
@@ -1033,24 +1084,35 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
     Block temp_block;
 
     if (_probe_index < _probe_block.rows()) {
+        DCHECK(_has_set_need_null_map_for_probe);
         std::visit(
-                [&](auto&& arg, auto&& process_hashtable_ctx, auto have_other_join_conjunct) {
+                [&](auto&& arg, auto&& process_hashtable_ctx, auto have_other_join_conjunct,
+                    auto need_null_map_for_probe) {
                     using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
                     if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         if constexpr (have_other_join_conjunct) {
                             if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                                st = process_hashtable_ctx.do_process_with_other_join_conjunts(
-                                        arg, &_null_map_column->get_data(), mutable_join_block,
-                                        &temp_block, probe_rows);
+                                st = process_hashtable_ctx
+                                             .template do_process_with_other_join_conjuncts<
+                                                     need_null_map_for_probe>(
+                                                     arg,
+                                                     need_null_map_for_probe
+                                                             ? &_null_map_column->get_data()
+                                                             : nullptr,
+                                                     mutable_join_block, &temp_block, probe_rows);
                             } else {
                                 LOG(FATAL) << "FATAL: uninited hash table";
                             }
                         } else {
                             if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                                st = process_hashtable_ctx.do_process(
-                                        arg, &_null_map_column->get_data(), mutable_join_block,
-                                        &temp_block, probe_rows);
+                                st = process_hashtable_ctx
+                                             .template do_process<need_null_map_for_probe>(
+                                                     arg,
+                                                     need_null_map_for_probe
+                                                             ? &_null_map_column->get_data()
+                                                             : nullptr,
+                                                     mutable_join_block, &temp_block, probe_rows);
                             } else {
                                 LOG(FATAL) << "FATAL: uninited hash table";
                             }
@@ -1060,7 +1122,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
                     }
                 },
                 _hash_table_variants, _process_hashtable_ctx_variants,
-                make_bool_variant(_have_other_join_conjunct));
+                make_bool_variant(_have_other_join_conjunct),
+                make_bool_variant(_need_null_map_for_probe));
     } else if (_probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
             std::visit(
@@ -1237,35 +1300,31 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
             _hash_table_variants);
 }
 
-// TODO:: unify the code of extract probe join column
-Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map,
-                                                ColumnRawPtrs& raw_ptrs, bool& ignore_null,
-                                                RuntimeProfile::Counter& expr_call_timer) {
+template <bool BuildSide>
+Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map,
+                                          ColumnRawPtrs& raw_ptrs,
+                                          const std::vector<int>& res_col_ids) {
+    DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size());
     for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
-        int result_col_id = -1;
-        // execute build column
-        {
-            SCOPED_TIMER(&expr_call_timer);
-            RETURN_IF_ERROR(_build_expr_ctxs[i]->execute(&block, &result_col_id));
-        }
-
-        // TODO: opt the column is const
-        block.get_by_position(result_col_id).column =
-                block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
-
         if (_is_null_safe_eq_join[i]) {
-            raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
+            raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get();
         } else {
-            auto column = block.get_by_position(result_col_id).column.get();
+            auto column = block.get_by_position(res_col_ids[i]).column.get();
             if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
                 auto& col_nested = nullable->get_nested_column();
                 auto& col_nullmap = nullable->get_null_map_data();
 
-                ignore_null |= !_build_not_ignore_null[i];
+                if constexpr (!BuildSide) {
+                    DCHECK(null_map != nullptr);
+                    VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
+                }
                 if (_build_not_ignore_null[i]) {
                     raw_ptrs[i] = nullable;
                 } else {
-                    VectorizedUtils::update_null_map(null_map, col_nullmap);
+                    if constexpr (BuildSide) {
+                        DCHECK(null_map != nullptr);
+                        VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
+                    }
                     raw_ptrs[i] = &col_nested;
                 }
             } else {
@@ -1276,49 +1335,45 @@ Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map,
     return Status::OK();
 }
 
-Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map,
-                                                ColumnRawPtrs& raw_ptrs,
-                                                RuntimeProfile::Counter& expr_call_timer) {
-    for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
+Status HashJoinNode::_do_evaluate(Block& block, std::vector<VExprContext*>& exprs,
+                                  RuntimeProfile::Counter& expr_call_timer,
+                                  std::vector<int>& res_col_ids) {
+    for (size_t i = 0; i < exprs.size(); ++i) {
         int result_col_id = -1;
         // execute build column
         {
             SCOPED_TIMER(&expr_call_timer);
-            RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(&block, &result_col_id));
+            RETURN_IF_ERROR(exprs[i]->execute(&block, &result_col_id));
         }
 
         // TODO: opt the column is const
         block.get_by_position(result_col_id).column =
                 block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
+        res_col_ids[i] = result_col_id;
+    }
+    return Status::OK();
+}
 
-        if (_is_null_safe_eq_join[i]) {
-            raw_ptrs[i] = block.get_by_position(result_col_id).column.get();
-        } else {
-            auto column = block.get_by_position(result_col_id).column.get();
-            if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
-                auto& col_nested = nullable->get_nested_column();
-                auto& col_nullmap = nullable->get_null_map_data();
-
-                VectorizedUtils::update_null_map(null_map, col_nullmap);
-                if (_build_not_ignore_null[i]) {
-                    raw_ptrs[i] = nullable;
-                } else {
-                    raw_ptrs[i] = &col_nested;
+template <bool BuildSide>
+bool HashJoinNode::_need_null_map(Block& block, const std::vector<int>& res_col_ids) {
+    DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size());
+    for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
+        if (!_is_null_safe_eq_join[i]) {
+            auto column = block.get_by_position(res_col_ids[i]).column.get();
+            if constexpr (BuildSide) {
+                if (check_and_get_column<ColumnNullable>(*column)) {
+                    if (!_build_not_ignore_null[i]) {
+                        return true;
+                    }
                 }
             } else {
-                if (_build_not_ignore_null[i]) {
-                    auto column_ptr =
-                            make_nullable(block.get_by_position(result_col_id).column, false);
-                    _probe_column_disguise_null.emplace_back(block.columns());
-                    block.insert({column_ptr,
-                                  make_nullable(block.get_by_position(result_col_id).type), ""});
-                    column = column_ptr.get();
+                if (check_and_get_column<ColumnNullable>(*column)) {
+                    return true;
                 }
-                raw_ptrs[i] = column;
             }
         }
     }
-    return Status::OK();
+    return false;
 }
 
 Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) {
@@ -1334,17 +1389,26 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
 
     ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
 
-    NullMap null_map_val(rows);
-    null_map_val.assign(rows, (uint8_t)0);
-    bool has_null = false;
+    ColumnUInt8::MutablePtr null_map_val;
+    std::vector<int> res_col_ids(_build_expr_ctxs.size());
+    RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, *_build_expr_call_timer, res_col_ids));
+    // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
+    //  so we have to initialize this flag by the first build block.
+    if (!_has_set_need_null_map_for_build) {
+        _has_set_need_null_map_for_build = true;
+        _need_null_map_for_build = _need_null_map<true>(block, res_col_ids);
+    }
+    if (_need_null_map_for_build) {
+        null_map_val = ColumnUInt8::create();
+        null_map_val->get_data().assign(rows, (uint8_t)0);
+    }
 
     // Get the key column that needs to be built
     Status st = std::visit(
             [&](auto&& arg) -> Status {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                    return _extract_build_join_column(block, null_map_val, raw_ptrs, has_null,
-                                                      *_build_expr_call_timer);
+                    return _extract_join_column<true>(block, null_map_val, raw_ptrs, res_col_ids);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
@@ -1355,126 +1419,147 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
     bool has_runtime_filter = !_runtime_filter_descs.empty();
 
     std::visit(
-            [&](auto&& arg) {
+            [&](auto&& arg, auto has_null_value, auto build_unique, auto has_runtime_filter_value,
+                auto need_null_map_for_build) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
                     ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
                             rows, block, raw_ptrs, this, state->batch_size(), offset);
-
-                    constexpr_3_bool_match<ProcessHashTableBuild<
-                            HashTableCtxType>::template Reducer>::run(has_null, _build_unique,
-                                                                      has_runtime_filter,
-                                                                      hash_table_build_process, arg,
-                                                                      &null_map_val);
+                    hash_table_build_process.template run<need_null_map_for_build, has_null_value,
+                                                          build_unique, has_runtime_filter_value>(
+                            arg, need_null_map_for_build ? &null_map_val->get_data() : nullptr);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
             },
-            _hash_table_variants);
+            _hash_table_variants, make_bool_variant(_build_side_ignore_null),
+            make_bool_variant(_build_unique), make_bool_variant(has_runtime_filter),
+            make_bool_variant(_need_null_map_for_build));
 
     return st;
 }
 
 void HashJoinNode::_hash_table_init() {
-    if (_build_expr_ctxs.size() == 1 && !_build_not_ignore_null[0]) {
-        // Single column optimization
-        switch (_build_expr_ctxs[0]->root()->result_type()) {
-        case TYPE_BOOLEAN:
-        case TYPE_TINYINT:
-            _hash_table_variants.emplace<I8HashTableContext>();
-            break;
-        case TYPE_SMALLINT:
-            _hash_table_variants.emplace<I16HashTableContext>();
-            break;
-        case TYPE_INT:
-        case TYPE_FLOAT:
-        case TYPE_DATEV2:
-            _hash_table_variants.emplace<I32HashTableContext>();
-            break;
-        case TYPE_BIGINT:
-        case TYPE_DOUBLE:
-        case TYPE_DATETIME:
-        case TYPE_DATE:
-        case TYPE_DATETIMEV2:
-            _hash_table_variants.emplace<I64HashTableContext>();
-            break;
-        case TYPE_LARGEINT:
-        case TYPE_DECIMALV2:
-        case TYPE_DECIMAL32:
-        case TYPE_DECIMAL64:
-        case TYPE_DECIMAL128: {
-            DataTypePtr& type_ptr = _build_expr_ctxs[0]->root()->data_type();
-            TypeIndex idx = _build_expr_ctxs[0]->root()->is_nullable()
-                                    ? assert_cast<const DataTypeNullable&>(*type_ptr)
-                                              .get_nested_type()
-                                              ->get_type_id()
-                                    : type_ptr->get_type_id();
-            WhichDataType which(idx);
-            if (which.is_decimal32()) {
-                _hash_table_variants.emplace<I32HashTableContext>();
-            } else if (which.is_decimal64()) {
-                _hash_table_variants.emplace<I64HashTableContext>();
-            } else {
-                _hash_table_variants.emplace<I128HashTableContext>();
-            }
-            break;
-        }
-        default:
-            _hash_table_variants.emplace<SerializedHashTableContext>();
-        }
-        return;
-    }
+    std::visit(
+            [&](auto&& join_op_variants, auto have_other_join_conjunct) {
+                using JoinOpType = std::decay_t<decltype(join_op_variants)>;
+                using RowRefListType = std::conditional_t<
+                        have_other_join_conjunct, RowRefListWithFlags,
+                        std::conditional_t<JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN ||
+                                                   JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN ||
+                                                   JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN ||
+                                                   JoinOpType::value == TJoinOp::FULL_OUTER_JOIN,
+                                           RowRefListWithFlag, RowRefList>>;
+                if (_build_expr_ctxs.size() == 1 && !_build_not_ignore_null[0]) {
+                    // Single column optimization
+                    switch (_build_expr_ctxs[0]->root()->result_type()) {
+                    case TYPE_BOOLEAN:
+                    case TYPE_TINYINT:
+                        _hash_table_variants.emplace<I8HashTableContext<RowRefListType>>();
+                        break;
+                    case TYPE_SMALLINT:
+                        _hash_table_variants.emplace<I16HashTableContext<RowRefListType>>();
+                        break;
+                    case TYPE_INT:
+                    case TYPE_FLOAT:
+                    case TYPE_DATEV2:
+                        _hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+                        break;
+                    case TYPE_BIGINT:
+                    case TYPE_DOUBLE:
+                    case TYPE_DATETIME:
+                    case TYPE_DATE:
+                    case TYPE_DATETIMEV2:
+                        _hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+                        break;
+                    case TYPE_LARGEINT:
+                    case TYPE_DECIMALV2:
+                    case TYPE_DECIMAL32:
+                    case TYPE_DECIMAL64:
+                    case TYPE_DECIMAL128: {
+                        DataTypePtr& type_ptr = _build_expr_ctxs[0]->root()->data_type();
+                        TypeIndex idx = _build_expr_ctxs[0]->root()->is_nullable()
+                                                ? assert_cast<const DataTypeNullable&>(*type_ptr)
+                                                          .get_nested_type()
+                                                          ->get_type_id()
+                                                : type_ptr->get_type_id();
+                        WhichDataType which(idx);
+                        if (which.is_decimal32()) {
+                            _hash_table_variants.emplace<I32HashTableContext<RowRefListType>>();
+                        } else if (which.is_decimal64()) {
+                            _hash_table_variants.emplace<I64HashTableContext<RowRefListType>>();
+                        } else {
+                            _hash_table_variants.emplace<I128HashTableContext<RowRefListType>>();
+                        }
+                        break;
+                    }
+                    default:
+                        _hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+                    }
+                    return;
+                }
 
-    bool use_fixed_key = true;
-    bool has_null = false;
-    int key_byte_size = 0;
+                bool use_fixed_key = true;
+                bool has_null = false;
+                int key_byte_size = 0;
 
-    _probe_key_sz.resize(_probe_expr_ctxs.size());
-    _build_key_sz.resize(_build_expr_ctxs.size());
+                _probe_key_sz.resize(_probe_expr_ctxs.size());
+                _build_key_sz.resize(_build_expr_ctxs.size());
 
-    for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
-        const auto vexpr = _build_expr_ctxs[i]->root();
-        const auto& data_type = vexpr->data_type();
+                for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
+                    const auto vexpr = _build_expr_ctxs[i]->root();
+                    const auto& data_type = vexpr->data_type();
 
-        if (!data_type->have_maximum_size_of_value()) {
-            use_fixed_key = false;
-            break;
-        }
+                    if (!data_type->have_maximum_size_of_value()) {
+                        use_fixed_key = false;
+                        break;
+                    }
 
-        auto is_null = data_type->is_nullable();
-        has_null |= is_null;
-        _build_key_sz[i] = data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
-        _probe_key_sz[i] = _build_key_sz[i];
-        key_byte_size += _probe_key_sz[i];
-    }
+                    auto is_null = data_type->is_nullable();
+                    has_null |= is_null;
+                    _build_key_sz[i] =
+                            data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
+                    _probe_key_sz[i] = _build_key_sz[i];
+                    key_byte_size += _probe_key_sz[i];
+                }
 
-    if (std::tuple_size<KeysNullMap<UInt256>>::value + key_byte_size > sizeof(UInt256)) {
-        use_fixed_key = false;
-    }
+                if (std::tuple_size<KeysNullMap<UInt256>>::value + key_byte_size >
+                    sizeof(UInt256)) {
+                    use_fixed_key = false;
+                }
 
-    if (use_fixed_key) {
-        // TODO: may we should support uint256 in the future
-        if (has_null) {
-            if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants.emplace<I64FixedKeyHashTableContext<true>>();
-            } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
-                       sizeof(UInt128)) {
-                _hash_table_variants.emplace<I128FixedKeyHashTableContext<true>>();
-            } else {
-                _hash_table_variants.emplace<I256FixedKeyHashTableContext<true>>();
-            }
-        } else {
-            if (key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants.emplace<I64FixedKeyHashTableContext<false>>();
-            } else if (key_byte_size <= sizeof(UInt128)) {
-                _hash_table_variants.emplace<I128FixedKeyHashTableContext<false>>();
-            } else {
-                _hash_table_variants.emplace<I256FixedKeyHashTableContext<false>>();
-            }
-        }
-    } else {
-        _hash_table_variants.emplace<SerializedHashTableContext>();
-    }
+                if (use_fixed_key) {
+                    // TODO: may we should support uint256 in the future
+                    if (has_null) {
+                        if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <=
+                            sizeof(UInt64)) {
+                            _hash_table_variants
+                                    .emplace<I64FixedKeyHashTableContext<true, RowRefListType>>();
+                        } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
+                                   sizeof(UInt128)) {
+                            _hash_table_variants
+                                    .emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
+                        } else {
+                            _hash_table_variants
+                                    .emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
+                        }
+                    } else {
+                        if (key_byte_size <= sizeof(UInt64)) {
+                            _hash_table_variants
+                                    .emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
+                        } else if (key_byte_size <= sizeof(UInt128)) {
+                            _hash_table_variants
+                                    .emplace<I128FixedKeyHashTableContext<false, RowRefListType>>();
+                        } else {
+                            _hash_table_variants
+                                    .emplace<I256FixedKeyHashTableContext<false, RowRefListType>>();
+                        }
+                    }
+                } else {
+                    _hash_table_variants.emplace<SerializedHashTableContext<RowRefListType>>();
+                }
+            },
+            _join_op_variants, make_bool_variant(_have_other_join_conjunct));
 }
 
 void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
@@ -1548,7 +1633,7 @@ Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_bloc
     return Status::OK();
 }
 
-void HashJoinNode::_add_tuple_is_null_column(doris::vectorized::Block* block) {
+void HashJoinNode::_add_tuple_is_null_column(Block* block) {
     if (_is_outer_join) {
         auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
         auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 2be992d5bd..5de6b0be9c 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -32,8 +32,9 @@
 namespace doris {
 namespace vectorized {
 
+template <typename RowRefListType>
 struct SerializedHashTableContext {
-    using Mapped = RowRefList;
+    using Mapped = RowRefListType;
     using HashTable = HashMap<StringRef, Mapped>;
     using State = ColumnsHashing::HashMethodSerialized<typename HashTable::value_type, Mapped>;
     using Iter = typename HashTable::iterator;
@@ -61,9 +62,9 @@ struct IsSerializedHashTableContextTraits<ColumnsHashing::HashMethodSerialized<V
 };
 
 // T should be UInt32 UInt64 UInt128
-template <class T>
+template <class T, typename RowRefListType>
 struct PrimaryTypeHashTableContext {
-    using Mapped = RowRefList;
+    using Mapped = RowRefListType;
     using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
     using State =
             ColumnsHashing::HashMethodOneNumber<typename HashTable::value_type, Mapped, T, false>;
@@ -82,16 +83,22 @@ struct PrimaryTypeHashTableContext {
 };
 
 // TODO: use FixedHashTable instead of HashTable
-using I8HashTableContext = PrimaryTypeHashTableContext<UInt8>;
-using I16HashTableContext = PrimaryTypeHashTableContext<UInt16>;
-using I32HashTableContext = PrimaryTypeHashTableContext<UInt32>;
-using I64HashTableContext = PrimaryTypeHashTableContext<UInt64>;
-using I128HashTableContext = PrimaryTypeHashTableContext<UInt128>;
-using I256HashTableContext = PrimaryTypeHashTableContext<UInt256>;
-
-template <class T, bool has_null>
+template <typename RowRefListType>
+using I8HashTableContext = PrimaryTypeHashTableContext<UInt8, RowRefListType>;
+template <typename RowRefListType>
+using I16HashTableContext = PrimaryTypeHashTableContext<UInt16, RowRefListType>;
+template <typename RowRefListType>
+using I32HashTableContext = PrimaryTypeHashTableContext<UInt32, RowRefListType>;
+template <typename RowRefListType>
+using I64HashTableContext = PrimaryTypeHashTableContext<UInt64, RowRefListType>;
+template <typename RowRefListType>
+using I128HashTableContext = PrimaryTypeHashTableContext<UInt128, RowRefListType>;
+template <typename RowRefListType>
+using I256HashTableContext = PrimaryTypeHashTableContext<UInt256, RowRefListType>;
+
+template <class T, bool has_null, typename RowRefListType>
 struct FixedKeyHashTableContext {
-    using Mapped = RowRefList;
+    using Mapped = RowRefListType;
     using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
     using State = ColumnsHashing::HashMethodKeysFixed<typename HashTable::value_type, T, Mapped,
                                                       has_null, false>;
@@ -109,22 +116,45 @@ struct FixedKeyHashTableContext {
     }
 };
 
-template <bool has_null>
-using I64FixedKeyHashTableContext = FixedKeyHashTableContext<UInt64, has_null>;
-
-template <bool has_null>
-using I128FixedKeyHashTableContext = FixedKeyHashTableContext<UInt128, has_null>;
-
-template <bool has_null>
-using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256, has_null>;
-
-using HashTableVariants =
-        std::variant<std::monostate, SerializedHashTableContext, I8HashTableContext,
-                     I16HashTableContext, I32HashTableContext, I64HashTableContext,
-                     I128HashTableContext, I256HashTableContext, I64FixedKeyHashTableContext<true>,
-                     I64FixedKeyHashTableContext<false>, I128FixedKeyHashTableContext<true>,
-                     I128FixedKeyHashTableContext<false>, I256FixedKeyHashTableContext<true>,
-                     I256FixedKeyHashTableContext<false>>;
+template <bool has_null, typename RowRefListType>
+using I64FixedKeyHashTableContext = FixedKeyHashTableContext<UInt64, has_null, RowRefListType>;
+
+template <bool has_null, typename RowRefListType>
+using I128FixedKeyHashTableContext = FixedKeyHashTableContext<UInt128, has_null, RowRefListType>;
+
+template <bool has_null, typename RowRefListType>
+using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256, has_null, RowRefListType>;
+
+using HashTableVariants = std::variant<
+        std::monostate, SerializedHashTableContext<RowRefList>, I8HashTableContext<RowRefList>,
+        I16HashTableContext<RowRefList>, I32HashTableContext<RowRefList>,
+        I64HashTableContext<RowRefList>, I128HashTableContext<RowRefList>,
+        I256HashTableContext<RowRefList>, I64FixedKeyHashTableContext<true, RowRefList>,
+        I64FixedKeyHashTableContext<false, RowRefList>,
+        I128FixedKeyHashTableContext<true, RowRefList>,
+        I128FixedKeyHashTableContext<false, RowRefList>,
+        I256FixedKeyHashTableContext<true, RowRefList>,
+        I256FixedKeyHashTableContext<false, RowRefList>,
+        SerializedHashTableContext<RowRefListWithFlag>, I8HashTableContext<RowRefListWithFlag>,
+        I16HashTableContext<RowRefListWithFlag>, I32HashTableContext<RowRefListWithFlag>,
+        I64HashTableContext<RowRefListWithFlag>, I128HashTableContext<RowRefListWithFlag>,
+        I256HashTableContext<RowRefListWithFlag>,
+        I64FixedKeyHashTableContext<true, RowRefListWithFlag>,
+        I64FixedKeyHashTableContext<false, RowRefListWithFlag>,
+        I128FixedKeyHashTableContext<true, RowRefListWithFlag>,
+        I128FixedKeyHashTableContext<false, RowRefListWithFlag>,
+        I256FixedKeyHashTableContext<true, RowRefListWithFlag>,
+        I256FixedKeyHashTableContext<false, RowRefListWithFlag>,
+        SerializedHashTableContext<RowRefListWithFlags>, I8HashTableContext<RowRefListWithFlags>,
+        I16HashTableContext<RowRefListWithFlags>, I32HashTableContext<RowRefListWithFlags>,
+        I64HashTableContext<RowRefListWithFlags>, I128HashTableContext<RowRefListWithFlags>,
+        I256HashTableContext<RowRefListWithFlags>,
+        I64FixedKeyHashTableContext<true, RowRefListWithFlags>,
+        I64FixedKeyHashTableContext<false, RowRefListWithFlags>,
+        I128FixedKeyHashTableContext<true, RowRefListWithFlags>,
+        I128FixedKeyHashTableContext<false, RowRefListWithFlags>,
+        I256FixedKeyHashTableContext<true, RowRefListWithFlags>,
+        I256FixedKeyHashTableContext<false, RowRefListWithFlags>>;
 
 using JoinOpVariants =
         std::variant<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>,
@@ -170,18 +200,18 @@ struct ProcessHashTableProbe {
     // the output block struct is same with mutable block. we can do more opt on it and simplify
     // the logic of probe
     // TODO: opt the visited here to reduce the size of hash table
-    template <typename HashTableType>
+    template <bool need_null_map_for_probe, typename HashTableType>
     Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
                       MutableBlock& mutable_block, Block* output_block, size_t probe_rows);
     // In the presence of other join conjunt, the process of join become more complicated.
     // each matching join column need to be processed by other join conjunt. so the sturct of mutable block
     // and output block may be different
     // The output result is determined by the other join conjunt result and same_to_prev struct
-    template <typename HashTableType>
-    Status do_process_with_other_join_conjunts(HashTableType& hash_table_ctx,
-                                               ConstNullMapPtr null_map,
-                                               MutableBlock& mutable_block, Block* output_block,
-                                               size_t probe_rows);
+    template <bool need_null_map_for_probe, typename HashTableType>
+    Status do_process_with_other_join_conjuncts(HashTableType& hash_table_ctx,
+                                                ConstNullMapPtr null_map,
+                                                MutableBlock& mutable_block, Block* output_block,
+                                                size_t probe_rows);
 
     // Process full outer join/ right join / right semi/anti join to output the join result
     // in hash table
@@ -325,10 +355,16 @@ private:
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     ColumnUInt8::MutablePtr _null_map_column;
+    bool _need_null_map_for_probe = false;
+    bool _has_set_need_null_map_for_probe = false;
+    bool _need_null_map_for_build = false;
+    bool _has_set_need_null_map_for_build = false;
     bool _probe_ignore_null = false;
     int _probe_index = -1;
     bool _probe_eos = false;
 
+    bool _build_side_ignore_null = false;
+
     Sizes _probe_key_sz;
     Sizes _build_key_sz;
 
@@ -359,11 +395,15 @@ private:
 
     Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset);
 
-    Status _extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
-                                      bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
+    Status _do_evaluate(Block& block, std::vector<VExprContext*>& exprs,
+                        RuntimeProfile::Counter& expr_call_timer, std::vector<int>& res_col_ids);
+
+    template <bool BuildSide>
+    Status _extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map,
+                                ColumnRawPtrs& raw_ptrs, const std::vector<int>& res_col_ids);
 
-    Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
-                                      RuntimeProfile::Counter& expr_call_timer);
+    template <bool BuildSide>
+    bool _need_null_map(Block& block, const std::vector<int>& res_col_ids);
 
     void _hash_table_init();
     void _process_hashtable_ctx_variants_init(RuntimeState* state);
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index b95714513c..736c9786d9 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -150,16 +150,16 @@ void VSetOperationNode::hash_table_init() {
         switch (_child_expr_lists[0][0]->root()->result_type()) {
         case TYPE_BOOLEAN:
         case TYPE_TINYINT:
-            _hash_table_variants.emplace<I8HashTableContext>();
+            _hash_table_variants.emplace<I8HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_SMALLINT:
-            _hash_table_variants.emplace<I16HashTableContext>();
+            _hash_table_variants.emplace<I16HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
         case TYPE_DECIMAL32:
-            _hash_table_variants.emplace<I32HashTableContext>();
+            _hash_table_variants.emplace<I32HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
@@ -167,15 +167,15 @@ void VSetOperationNode::hash_table_init() {
         case TYPE_DATE:
         case TYPE_DECIMAL64:
         case TYPE_DATETIMEV2:
-            _hash_table_variants.emplace<I64HashTableContext>();
+            _hash_table_variants.emplace<I64HashTableContext<RowRefListWithFlags>>();
             break;
         case TYPE_LARGEINT:
         case TYPE_DECIMALV2:
         case TYPE_DECIMAL128:
-            _hash_table_variants.emplace<I128HashTableContext>();
+            _hash_table_variants.emplace<I128HashTableContext<RowRefListWithFlags>>();
             break;
         default:
-            _hash_table_variants.emplace<SerializedHashTableContext>();
+            _hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
         }
         return;
     }
@@ -208,24 +208,30 @@ void VSetOperationNode::hash_table_init() {
     if (use_fixed_key) {
         if (has_null) {
             if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants.emplace<I64FixedKeyHashTableContext<true>>();
+                _hash_table_variants
+                        .emplace<I64FixedKeyHashTableContext<true, RowRefListWithFlags>>();
             } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
                        sizeof(UInt128)) {
-                _hash_table_variants.emplace<I128FixedKeyHashTableContext<true>>();
+                _hash_table_variants
+                        .emplace<I128FixedKeyHashTableContext<true, RowRefListWithFlags>>();
             } else {
-                _hash_table_variants.emplace<I256FixedKeyHashTableContext<true>>();
+                _hash_table_variants
+                        .emplace<I256FixedKeyHashTableContext<true, RowRefListWithFlags>>();
             }
         } else {
             if (key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants.emplace<I64FixedKeyHashTableContext<false>>();
+                _hash_table_variants
+                        .emplace<I64FixedKeyHashTableContext<false, RowRefListWithFlags>>();
             } else if (key_byte_size <= sizeof(UInt128)) {
-                _hash_table_variants.emplace<I128FixedKeyHashTableContext<false>>();
+                _hash_table_variants
+                        .emplace<I128FixedKeyHashTableContext<false, RowRefListWithFlags>>();
             } else {
-                _hash_table_variants.emplace<I256FixedKeyHashTableContext<false>>();
+                _hash_table_variants
+                        .emplace<I256FixedKeyHashTableContext<false, RowRefListWithFlags>>();
             }
         }
     } else {
-        _hash_table_variants.emplace<SerializedHashTableContext>();
+        _hash_table_variants.emplace<SerializedHashTableContext<RowRefListWithFlags>>();
     }
 }
 
diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h
index ba4eba3013..8c20c1447e 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -102,53 +102,57 @@ void VSetOperationNode::refresh_hash_table() {
             [&](auto&& arg) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                    HashTableCtxType tmp_hash_table;
-                    bool is_need_shrink =
-                            arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
-                    if (is_need_shrink) {
-                        tmp_hash_table.hash_table.init_buf_size(
-                                _valid_element_in_hash_tbl / arg.hash_table.get_factor() + 1);
-                    }
+                    if constexpr (std::is_same_v<typename HashTableCtxType::Mapped,
+                                                 RowRefListWithFlags>) {
+                        HashTableCtxType tmp_hash_table;
+                        bool is_need_shrink =
+                                arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
+                        if (is_need_shrink) {
+                            tmp_hash_table.hash_table.init_buf_size(
+                                    _valid_element_in_hash_tbl / arg.hash_table.get_factor() + 1);
+                        }
 
-                    arg.init_once();
-                    auto& iter = arg.iter;
-                    auto iter_end = arg.hash_table.end();
-                    while (iter != iter_end) {
-                        auto& mapped = iter->get_second();
-                        auto it = mapped.begin();
+                        arg.init_once();
+                        auto& iter = arg.iter;
+                        auto iter_end = arg.hash_table.end();
+                        while (iter != iter_end) {
+                            auto& mapped = iter->get_second();
+                            auto it = mapped.begin();
 
-                        if constexpr (keep_matched) { //intersected
-                            if (it->visited) {
-                                it->visited = false;
-                                if (is_need_shrink) {
-                                    tmp_hash_table.hash_table.insert(iter->get_value());
-                                }
-                                ++iter;
-                            } else {
-                                if (!is_need_shrink) {
-                                    arg.hash_table.delete_zero_key(iter->get_first());
-                                    // the ++iter would check if the current key is zero. if it does, the iterator will be moved to the container's head.
-                                    // so we do ++iter before set_zero to make the iterator move to next valid key correctly.
-                                    auto iter_prev = iter;
+                            if constexpr (keep_matched) { //intersected
+                                if (it->visited) {
+                                    it->visited = false;
+                                    if (is_need_shrink) {
+                                        tmp_hash_table.hash_table.insert(iter->get_value());
+                                    }
                                     ++iter;
-                                    iter_prev->set_zero();
                                 } else {
-                                    ++iter;
+                                    if (!is_need_shrink) {
+                                        arg.hash_table.delete_zero_key(iter->get_first());
+                                        // the ++iter would check if the current key is zero. if it does, the iterator will be moved to the container's head.
+                                        // so we do ++iter before set_zero to make the iterator move to next valid key correctly.
+                                        auto iter_prev = iter;
+                                        ++iter;
+                                        iter_prev->set_zero();
+                                    } else {
+                                        ++iter;
+                                    }
                                 }
+                            } else { //except
+                                if (!it->visited && is_need_shrink) {
+                                    tmp_hash_table.hash_table.insert(iter->get_value());
+                                }
+                                ++iter;
                             }
-                        } else { //except
-                            if (!it->visited && is_need_shrink) {
-                                tmp_hash_table.hash_table.insert(iter->get_value());
-                            }
-                            ++iter;
                         }
-                    }
 
-                    arg.inited = false;
-                    if (is_need_shrink) {
-                        arg.hash_table = std::move(tmp_hash_table.hash_table);
+                        arg.inited = false;
+                        if (is_need_shrink) {
+                            arg.hash_table = std::move(tmp_hash_table.hash_table);
+                        }
+                    } else {
+                        LOG(FATAL) << "FATAL: Invalid RowRefList";
                     }
-
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
@@ -179,24 +183,29 @@ struct HashTableProbe {
 
         KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, nullptr);
 
-        for (; _probe_index < _probe_rows;) {
-            auto find_result = key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena);
-            if (find_result.is_found()) { //if found, marked visited
-                auto it = find_result.get_mapped().begin();
-                if (!(it->visited)) {
-                    it->visited = true;
-                    if constexpr (is_intersected) //intersected
-                        _operation_node->_valid_element_in_hash_tbl++;
-                    else
-                        _operation_node->_valid_element_in_hash_tbl--; //except
+        if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) {
+            for (; _probe_index < _probe_rows;) {
+                auto find_result =
+                        key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena);
+                if (find_result.is_found()) { //if found, marked visited
+                    auto it = find_result.get_mapped().begin();
+                    if (!(it->visited)) {
+                        it->visited = true;
+                        if constexpr (is_intersected) //intersected
+                            _operation_node->_valid_element_in_hash_tbl++;
+                        else
+                            _operation_node->_valid_element_in_hash_tbl--; //except
+                    }
                 }
+                _probe_index++;
             }
-            _probe_index++;
+        } else {
+            LOG(FATAL) << "Invalid RowRefListType!";
         }
         return Status::OK();
     }
 
-    void add_result_columns(RowRefList& value, int& block_size) {
+    void add_result_columns(RowRefListWithFlags& value, int& block_size) {
         auto it = value.begin();
         for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); ++idx) {
             auto& column = *_build_blocks[it->block_offset].get_by_position(idx->first).column;
@@ -213,18 +222,22 @@ struct HashTableProbe {
         auto& iter = hash_table_ctx.iter;
         auto block_size = 0;
 
-        for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) {
-            auto& value = iter->get_second();
-            auto it = value.begin();
-            if constexpr (is_intersected) {
-                if (it->visited) { //intersected: have done probe, so visited values it's the result
-                    add_result_columns(value, block_size);
-                }
-            } else {
-                if (!it->visited) { //except: haven't visited values it's the needed result
-                    add_result_columns(value, block_size);
+        if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) {
+            for (; iter != hash_table_ctx.hash_table.end() && block_size < _batch_size; ++iter) {
+                auto& value = iter->get_second();
+                auto it = value.begin();
+                if constexpr (is_intersected) {
+                    if (it->visited) { //intersected: have done probe, so visited values it's the result
+                        add_result_columns(value, block_size);
+                    }
+                } else {
+                    if (!it->visited) { //except: haven't visited values it's the needed result
+                        add_result_columns(value, block_size);
+                    }
                 }
             }
+        } else {
+            LOG(FATAL) << "Invalid RowRefListType!";
         }
 
         *eos = iter == hash_table_ctx.hash_table.end();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org