You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/25 14:18:14 UTC

[doris] branch master updated: Revert "[fix](vectorized) Support outer join for vectorized exec engine (#10323)" (#10424)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eebfbd0c91 Revert "[fix](vectorized) Support outer join for vectorized exec engine (#10323)" (#10424)
eebfbd0c91 is described below

commit eebfbd0c91588a9448612a3df93f5aeab85a9458
Author: Gabriel <ga...@gmail.com>
AuthorDate: Sat Jun 25 22:18:08 2022 +0800

    Revert "[fix](vectorized) Support outer join for vectorized exec engine (#10323)" (#10424)
    
    This reverts commit 2cc670dba697a330358ae7d485d856e4b457c679.
---
 be/src/exec/exec_node.cpp                          |   4 +-
 be/src/exec/exec_node.h                            |   2 +-
 be/src/vec/columns/column_nullable.cpp             |   7 -
 be/src/vec/columns/column_nullable.h               |   1 -
 be/src/vec/core/block.cpp                          |   4 +-
 be/src/vec/exec/join/vhash_join_node.cpp           | 213 ++++------------
 be/src/vec/exec/join/vhash_join_node.h             |  36 +--
 .../java/org/apache/doris/analysis/Analyzer.java   |  65 +++++
 .../org/apache/doris/analysis/DescriptorTable.java |  17 --
 .../apache/doris/analysis/ExprSubstitutionMap.java |  79 +-----
 .../java/org/apache/doris/analysis/SelectStmt.java |   8 +
 .../java/org/apache/doris/analysis/TableRef.java   |   5 +
 ...ectorizedUtil.java => VecNotImplException.java} |  20 +-
 .../apache/doris/common/util/VectorizedUtil.java   |  35 +++
 .../org/apache/doris/planner/AggregationNode.java  |  16 +-
 .../org/apache/doris/planner/HashJoinNode.java     | 277 ++-------------------
 .../org/apache/doris/planner/OlapScanNode.java     |   1 +
 .../java/org/apache/doris/planner/PlanNode.java    |  15 +-
 .../org/apache/doris/planner/ProjectPlanner.java   |   3 +-
 .../org/apache/doris/planner/SetOperationNode.java |   1 -
 .../apache/doris/planner/SingleNodePlanner.java    |  18 +-
 .../java/org/apache/doris/planner/SortNode.java    |   2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   9 +
 .../org/apache/doris/analysis/QueryStmtTest.java   |   8 +
 .../doris/planner/ProjectPlannerFunctionTest.java  |   4 +-
 .../org/apache/doris/planner/QueryPlanTest.java    |  35 ++-
 gensrc/thrift/PlanNodes.thrift                     |   4 -
 27 files changed, 254 insertions(+), 635 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index d0de6aa326..4030b552ac 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -214,9 +214,9 @@ Status ExecNode::prepare(RuntimeState* state) {
                                                    _mem_tracker);
 
     if (_vconjunct_ctx_ptr) {
-        RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _row_descriptor, expr_mem_tracker()));
+        RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, row_desc(), expr_mem_tracker()));
     }
-    RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor, expr_mem_tracker()));
+    RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker()));
 
     // TODO(zc):
     // AddExprCtxsToFree(_conjunct_ctxs);
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index e0a1428952..35e495d158 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -182,7 +182,7 @@ public:
 
     int id() const { return _id; }
     TPlanNodeType::type type() const { return _type; }
-    virtual const RowDescriptor& row_desc() const { return _row_descriptor; }
+    const RowDescriptor& row_desc() const { return _row_descriptor; }
     int64_t rows_returned() const { return _num_rows_returned; }
     int64_t limit() const { return _limit; }
     bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp
index acb8e7787c..215ced2383 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -464,11 +464,4 @@ ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) {
     return ColumnNullable::create(column, ColumnUInt8::create(column->size(), is_nullable ? 1 : 0));
 }
 
-ColumnPtr remove_nullable(const ColumnPtr& column) {
-    if (is_column_nullable(*column)) {
-        return reinterpret_cast<const ColumnNullable*>(column.get())->get_nested_column_ptr();
-    }
-    return column;
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h
index 3b9465e9b4..e6f4495c30 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -289,6 +289,5 @@ private:
 };
 
 ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable = false);
-ColumnPtr remove_nullable(const ColumnPtr& column);
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index c95a97e6c9..3e50c1578b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -601,7 +601,7 @@ void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t
     auto count = count_bytes_in_filter(filter);
     if (count == 0) {
         for (size_t i = 0; i < column_to_keep; ++i) {
-            std::move(*block->get_by_position(i).column).assume_mutable()->clear();
+            std::move(*block->get_by_position(i).column).mutate()->clear();
         }
     } else {
         if (count != block->rows()) {
@@ -651,7 +651,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
         bool ret = const_column->get_bool(0);
         if (!ret) {
             for (size_t i = 0; i < column_to_keep; ++i) {
-                std::move(*block->get_by_position(i).column).assume_mutable()->clear();
+                std::move(*block->get_by_position(i).column).mutate()->clear();
             }
         }
     } else {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 7aba12bee1..4471034ea4 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -489,7 +489,7 @@ struct ProcessHashTableProbe {
                             typeid_cast<ColumnNullable*>(
                                     std::move(*output_block->get_by_position(j + right_col_idx)
                                                        .column)
-                                            .assume_mutable()
+                                            .mutate()
                                             .get())
                                     ->get_null_map_data()[i] = true;
                         }
@@ -587,9 +587,7 @@ struct ProcessHashTableProbe {
         auto& mcol = mutable_block.mutable_columns();
 
         int right_col_idx =
-                (_join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct)
-                        ? 0
-                        : _join_node->_left_table_data_types.size();
+                _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size();
         int right_col_len = _join_node->_right_table_data_types.size();
 
         auto& iter = hash_table_ctx.iter;
@@ -626,8 +624,7 @@ struct ProcessHashTableProbe {
         }
         *eos = iter == hash_table_ctx.hash_table.end();
 
-        output_block->swap(
-                mutable_block.to_block(_join_node->_is_right_semi_anti ? right_col_idx : 0));
+        output_block->swap(mutable_block.to_block());
         return Status::OK();
     }
 
@@ -667,8 +664,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
           _is_outer_join(_match_all_build || _match_all_probe),
           _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
                                         ? tnode.hash_join_node.hash_output_slot_ids
-                                        : std::vector<SlotId> {}),
-          _output_row_desc(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}) {
+                                        : std::vector<SlotId> {}) {
     _runtime_filter_descs = tnode.runtime_filters;
     init_join_op();
 
@@ -692,8 +688,8 @@ void HashJoinNode::init_join_op() {
         //do nothing
         break;
     }
+    return;
 }
-
 Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
     DCHECK(tnode.__isset.hash_join_node);
@@ -709,15 +705,15 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
             _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
 
     const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts;
-    for (const auto& eq_join_conjunct : eq_join_conjuncts) {
+    for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
         VExprContext* ctx = nullptr;
-        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx));
+        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].left, &ctx));
         _probe_expr_ctxs.push_back(ctx);
-        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.right, &ctx));
+        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].right, &ctx));
         _build_expr_ctxs.push_back(ctx);
 
-        bool null_aware = eq_join_conjunct.__isset.opcode &&
-                          eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL;
+        bool null_aware = eq_join_conjuncts[i].__isset.opcode &&
+                          eq_join_conjuncts[i].opcode == TExprOpcode::EQ_FOR_NULL;
         _is_null_safe_eq_join.push_back(null_aware);
 
         // if is null aware, build join column and probe join column both need dispose null value
@@ -741,13 +737,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
         _have_other_join_conjunct = true;
     }
 
-    const auto& output_exprs = tnode.hash_join_node.srcExprList;
-    for (const auto& expr : output_exprs) {
-        VExprContext* ctx = nullptr;
-        RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx));
-        _output_expr_ctxs.push_back(ctx);
-    }
-
     for (const auto& filter_desc : _runtime_filter_descs) {
         RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
                 RuntimeFilterRole::PRODUCER, filter_desc, state->query_options()));
@@ -814,16 +803,12 @@ Status HashJoinNode::prepare(RuntimeState* state) {
                 (*_vother_join_conjunct_ptr)
                         ->prepare(state, _row_desc_for_other_join_conjunt, expr_mem_tracker()));
     }
-
-    RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_descriptor, expr_mem_tracker()));
-
     // right table data types
     _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc());
     _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc());
 
     // Hash Table Init
     _hash_table_init();
-    _construct_mutable_join_block();
 
     _build_block_offsets.resize(state->batch_size());
     _build_block_rows.resize(state->batch_size());
@@ -838,7 +823,6 @@ Status HashJoinNode::close(RuntimeState* state) {
     VExpr::close(_build_expr_ctxs, state);
     VExpr::close(_probe_expr_ctxs, state);
     if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
-    VExpr::close(_output_expr_ctxs, state);
 
     _hash_table_mem_tracker->release(_mem_used);
 
@@ -856,7 +840,17 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
     size_t probe_rows = _probe_block.rows();
     if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) {
         _probe_index = 0;
-        _prepare_probe_block();
+        // clear_column_data of _probe_block
+        {
+            if (!_probe_column_disguise_null.empty()) {
+                for (int i = 0; i < _probe_column_disguise_null.size(); ++i) {
+                    auto column_to_erase = _probe_column_disguise_null[i];
+                    _probe_block.erase(column_to_erase - i);
+                }
+                _probe_column_disguise_null.clear();
+            }
+            release_block_memory(_probe_block);
+        }
 
         do {
             SCOPED_TIMER(_probe_next_timer);
@@ -866,9 +860,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
         probe_rows = _probe_block.rows();
         if (probe_rows != 0) {
             COUNTER_UPDATE(_probe_rows_counter, probe_rows);
-            if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
-                _probe_column_convert_to_null = _convert_block_to_null(_probe_block);
-            }
 
             int probe_expr_ctxs_sz = _probe_expr_ctxs.size();
             _probe_columns.resize(probe_expr_ctxs_sz);
@@ -882,9 +873,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
                             auto& null_map_val = _null_map_column->get_data();
-                            return _extract_probe_join_column(_probe_block, null_map_val,
-                                                              _probe_columns, _probe_ignore_null,
-                                                              *_probe_expr_call_timer);
+                            return extract_probe_join_column(_probe_block, null_map_val,
+                                                             _probe_columns, _probe_ignore_null,
+                                                             *_probe_expr_call_timer);
                         } else {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
@@ -897,9 +888,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
     }
 
     Status st;
-    _join_block.clear_column_data();
-    MutableBlock mutable_join_block(&_join_block);
-    Block temp_block;
 
     if (_probe_index < _probe_block.rows()) {
         std::visit(
@@ -908,22 +896,33 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
                     using HashTableCtxType = std::decay_t<decltype(arg)>;
                     using JoinOpType = std::decay_t<decltype(join_op_variants)>;
                     if constexpr (have_other_join_conjunct) {
+                        MutableBlock mutable_block(
+                                VectorizedUtils::create_empty_columnswithtypename(
+                                        _row_desc_for_other_join_conjunt));
+
                         if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
                             ProcessHashTableProbe<HashTableCtxType, JoinOpType, probe_ignore_null>
                                     process_hashtable_ctx(this, state->batch_size(), probe_rows);
                             st = process_hashtable_ctx.do_process_with_other_join_conjunts(
-                                    arg, &_null_map_column->get_data(), mutable_join_block,
-                                    &temp_block);
+                                    arg, &_null_map_column->get_data(), mutable_block,
+                                    output_block);
                         } else {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
                     } else {
+                        MutableBlock mutable_block =
+                                output_block->mem_reuse()
+                                        ? MutableBlock(output_block)
+                                        : MutableBlock(
+                                                  VectorizedUtils::create_empty_columnswithtypename(
+                                                          row_desc()));
+
                         if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
                             ProcessHashTableProbe<HashTableCtxType, JoinOpType, probe_ignore_null>
                                     process_hashtable_ctx(this, state->batch_size(), probe_rows);
                             st = process_hashtable_ctx.do_process(arg,
                                                                   &_null_map_column->get_data(),
-                                                                  mutable_join_block, &temp_block);
+                                                                  mutable_block, output_block);
                         } else {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
@@ -934,6 +933,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
                 make_bool_variant(_probe_ignore_null));
     } else if (_probe_eos) {
         if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
+            MutableBlock mutable_block(
+                    VectorizedUtils::create_empty_columnswithtypename(row_desc()));
             std::visit(
                     [&](auto&& arg, auto&& join_op_variants) {
                         using JoinOpType = std::decay_t<decltype(join_op_variants)>;
@@ -941,8 +942,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
                         if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
                             ProcessHashTableProbe<HashTableCtxType, JoinOpType, false>
                                     process_hashtable_ctx(this, state->batch_size(), probe_rows);
-                            st = process_hashtable_ctx.process_data_in_hashtable(
-                                    arg, mutable_join_block, &temp_block, eos);
+                            st = process_hashtable_ctx.process_data_in_hashtable(arg, mutable_block,
+                                                                                 output_block, eos);
                         } else {
                             LOG(FATAL) << "FATAL: uninited hash table";
                         }
@@ -957,74 +958,12 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
     }
 
     RETURN_IF_ERROR(
-            VExprContext::filter_block(_vconjunct_ctx_ptr, &temp_block, temp_block.columns()));
-    RETURN_IF_ERROR(_build_output_block(&temp_block, output_block));
+            VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns()));
     reached_limit(output_block, eos);
 
     return st;
 }
 
-void HashJoinNode::_prepare_probe_block() {
-    // clear_column_data of _probe_block
-    if (!_probe_column_disguise_null.empty()) {
-        for (int i = 0; i < _probe_column_disguise_null.size(); ++i) {
-            auto column_to_erase = _probe_column_disguise_null[i];
-            _probe_block.erase(column_to_erase - i);
-        }
-        _probe_column_disguise_null.clear();
-    }
-
-    // remove add nullmap of probe columns
-    for (auto index : _probe_column_convert_to_null) {
-        auto& column_type = _probe_block.safe_get_by_position(index);
-        DCHECK(column_type.column->is_nullable());
-        DCHECK(column_type.type->is_nullable());
-
-        column_type.column = remove_nullable(column_type.column);
-        column_type.type = remove_nullable(column_type.type);
-    }
-    release_block_memory(_probe_block);
-}
-
-void HashJoinNode::_construct_mutable_join_block() {
-    const auto& mutable_block_desc =
-            _have_other_join_conjunct ? _row_desc_for_other_join_conjunt : _row_descriptor;
-
-    // TODO: Support Intermediate tuple in FE to delete the dispose the convert null operation
-    // here
-    auto [start_convert_null, end_convert_null] = std::pair {0, 0};
-
-    switch (_join_op) {
-    case TJoinOp::LEFT_OUTER_JOIN: {
-        start_convert_null = child(0)->row_desc().num_materialized_slots();
-        end_convert_null = child(0)->row_desc().num_materialized_slots() +
-                           child(1)->row_desc().num_materialized_slots();
-        break;
-    }
-    case TJoinOp::RIGHT_OUTER_JOIN: {
-        end_convert_null = child(0)->row_desc().num_materialized_slots();
-        break;
-    }
-    case TJoinOp::FULL_OUTER_JOIN: {
-        end_convert_null = child(0)->row_desc().num_materialized_slots() +
-                           child(1)->row_desc().num_materialized_slots();
-        break;
-    }
-    default:
-        break;
-    }
-
-    for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) {
-        for (const auto slot_desc : tuple_desc->slots()) {
-            auto offset = _join_block.columns();
-            auto type_ptr = (offset >= start_convert_null && offset < end_convert_null)
-                                    ? make_nullable(slot_desc->get_data_type_ptr())
-                                    : slot_desc->get_data_type_ptr();
-            _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()});
-        }
-    }
-}
-
 Status HashJoinNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
@@ -1112,9 +1051,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) {
 }
 
 // TODO:: unify the code of extract probe join column
-Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map,
-                                                ColumnRawPtrs& raw_ptrs, bool& ignore_null,
-                                                RuntimeProfile::Counter& expr_call_timer) {
+Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map,
+                                               ColumnRawPtrs& raw_ptrs, bool& ignore_null,
+                                               RuntimeProfile::Counter& expr_call_timer) {
     for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) {
         int result_col_id = -1;
         // execute build column
@@ -1150,9 +1089,9 @@ Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map,
     return Status::OK();
 }
 
-Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map,
-                                                ColumnRawPtrs& raw_ptrs, bool& ignore_null,
-                                                RuntimeProfile::Counter& expr_call_timer) {
+Status HashJoinNode::extract_probe_join_column(Block& block, NullMap& null_map,
+                                               ColumnRawPtrs& raw_ptrs, bool& ignore_null,
+                                               RuntimeProfile::Counter& expr_call_timer) {
     for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
         int result_col_id = -1;
         // execute build column
@@ -1204,9 +1143,6 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
     }
     COUNTER_UPDATE(_build_rows_counter, rows);
 
-    if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
-        _convert_block_to_null(block);
-    }
     ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
 
     NullMap null_map_val(rows);
@@ -1218,8 +1154,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
             [&](auto&& arg) -> Status {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
-                    return _extract_build_join_column(block, null_map_val, raw_ptrs, has_null,
-                                                      *_build_expr_call_timer);
+                    return extract_build_join_column(block, null_map_val, raw_ptrs, has_null,
+                                                     *_build_expr_call_timer);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                 }
@@ -1337,51 +1273,4 @@ void HashJoinNode::_hash_table_init() {
         _hash_table_variants.emplace<SerializedHashTableContext>();
     }
 }
-
-std::vector<uint16_t> HashJoinNode::_convert_block_to_null(Block& block) {
-    std::vector<uint16_t> results;
-    for (int i = 0; i < block.columns(); ++i) {
-        if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) {
-            DCHECK(!column_type.column->is_nullable());
-            column_type.column = make_nullable(column_type.column);
-            column_type.type = make_nullable(column_type.type);
-            results.emplace_back(i);
-        }
-    }
-    return results;
-}
-
-Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_block) {
-    auto is_mem_reuse = output_block->mem_reuse();
-    MutableBlock mutable_block =
-            is_mem_reuse ? MutableBlock(output_block)
-                         : MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
-                                   _output_row_desc));
-    auto rows = origin_block->rows();
-    if (rows != 0) {
-        auto& mutable_columns = mutable_block.mutable_columns();
-        if (_output_expr_ctxs.empty()) {
-            DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
-            for (int i = 0; i < mutable_columns.size(); ++i) {
-                mutable_columns[i]->insert_range_from(*origin_block->get_by_position(i).column, 0,
-                                                      rows);
-            }
-        } else {
-            DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots());
-            for (int i = 0; i < mutable_columns.size(); ++i) {
-                auto result_column_id = -1;
-                RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
-                auto column_ptr = origin_block->get_by_position(result_column_id)
-                                          .column->convert_to_full_column_if_const();
-                mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
-            }
-        }
-
-        if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
-        DCHECK(output_block->rows() == rows);
-    }
-
-    return Status::OK();
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 0c8c658d58..7db2db48d3 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -147,17 +147,15 @@ public:
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
     ~HashJoinNode() override;
 
-    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
-    Status prepare(RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
-    Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-    Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-    Status close(RuntimeState* state) override;
+    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
+    virtual Status prepare(RuntimeState* state) override;
+    virtual Status open(RuntimeState* state) override;
+    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+    virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+    virtual Status close(RuntimeState* state) override;
     HashTableVariants& get_hash_table_variants() { return _hash_table_variants; }
     void init_join_op();
 
-    const RowDescriptor& row_desc() const override { return _output_row_desc; }
-
 private:
     using VExprContexts = std::vector<VExprContext*>;
 
@@ -170,8 +168,6 @@ private:
     VExprContexts _build_expr_ctxs;
     // other expr
     std::unique_ptr<VExprContext*> _vother_join_conjunct_ptr;
-    // output expr
-    VExprContexts _output_expr_ctxs;
 
     // mark the join column whether support null eq
     std::vector<bool> _is_null_safe_eq_join;
@@ -182,7 +178,6 @@ private:
     std::vector<bool> _probe_not_ignore_null;
 
     std::vector<uint16_t> _probe_column_disguise_null;
-    std::vector<uint16_t> _probe_column_convert_to_null;
 
     DataTypes _right_table_data_types;
     DataTypes _left_table_data_types;
@@ -231,7 +226,6 @@ private:
     bool _have_other_join_conjunct = false;
 
     RowDescriptor _row_desc_for_other_join_conjunt;
-    Block _join_block;
 
     std::vector<uint32_t> _items_counts;
     std::vector<int8_t> _build_block_offsets;
@@ -243,8 +237,6 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
 
-    RowDescriptor _output_row_desc;
-
 private:
     void _hash_table_build_thread(RuntimeState* state, std::promise<Status>* status);
 
@@ -252,22 +244,14 @@ private:
 
     Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset);
 
-    Status _extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
-                                      bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
+    Status extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
+                                     bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
 
-    Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
-                                      bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
+    Status extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs,
+                                     bool& ignore_null, RuntimeProfile::Counter& expr_call_timer);
 
     void _hash_table_init();
 
-    void _prepare_probe_block();
-
-    void _construct_mutable_join_block();
-
-    Status _build_output_block(Block* origin_block, Block* output_block);
-
-    static std::vector<uint16_t> _convert_block_to_null(Block& block);
-
     template <class HashTableContext, bool ignore_null, bool build_unique>
     friend struct ProcessHashTableBuild;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index ea3047a8d1..957b396a64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.external.hudi.HudiTable;
 import org.apache.doris.external.hudi.HudiUtils;
@@ -261,6 +262,8 @@ public class Analyzer {
         // to the last Join clause (represented by its rhs table ref) that outer-joined it
         private final Map<TupleId, TableRef> outerJoinedTupleIds = Maps.newHashMap();
 
+        private final Set<TupleId> outerJoinedMaterializedTupleIds = Sets.newHashSet();
+
         // Map of registered conjunct to the last full outer join (represented by its
         // rhs table ref) that outer joined it.
         public final Map<ExprId, TableRef> fullOuterJoinedConjuncts = Maps.newHashMap();
@@ -786,6 +789,13 @@ public class Analyzer {
         String key = d.getAlias() + "." + col.getName();
         SlotDescriptor result = slotRefMap.get(key);
         if (result != null) {
+            // this is a trick to set slot as nullable when slot is on inline view
+            // When analyze InlineViewRef, we first generate sMap and baseTblSmap and then analyze join.
+            // We have already registered column ref at that time, but we did not know
+            // whether inline view is outer joined. So we have to check it and set slot as nullable here.
+            if (isOuterJoined(d.getId())) {
+                result.setIsNullable(true);
+            }
             result.setMultiRef(true);
             return result;
         }
@@ -941,6 +951,57 @@ public class Analyzer {
         }
     }
 
+    public void registerOuterJoinedMaterilizeTids(List<TupleId> tids) {
+        globalState.outerJoinedMaterializedTupleIds.addAll(tids);
+    }
+
+    /**
+     * The main function of this method is to set the column property on the nullable side of the outer join
+     * to nullable in the case of vectorization.
+     * For example:
+     * Query: select * from t1 left join t2 on t1.k1=t2.k1
+     * Origin: t2.k1 not null
+     * Result: t2.k1 is nullable
+     *
+     * @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable.
+     *     It will report an error and fall back from vectorized mode to non-vectorized mode for execution.
+     *     If the nullside column of the outer join is a column that must return non-null like count(*)
+     *     then there is no way to force the column to be nullable.
+     *     At this time, vectorization cannot support this situation,
+     *     so it is necessary to fall back to non-vectorization for processing.
+     *     For example:
+     *       Query: select * from t1 left join
+     *              (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1
+     *       Origin: tmp.k1 not null, tmp.count_k2 not null
+     *       Result: throw VecNotImplException
+     */
+    public void changeAllOuterJoinTupleToNull() throws VecNotImplException {
+        for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) {
+            for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
+                changeSlotToNull(slotDescriptor);
+            }
+        }
+
+        for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) {
+            for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) {
+                changeSlotToNull(slotDescriptor);
+            }
+        }
+    }
+
+    private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException {
+        if (slotDescriptor.getSourceExprs().isEmpty()) {
+            slotDescriptor.setIsNullable(true);
+            return;
+        }
+        for (Expr sourceExpr : slotDescriptor.getSourceExprs()) {
+            if (!sourceExpr.isNullable()) {
+                throw new VecNotImplException("The slot (" + slotDescriptor.toString()
+                        + ") could not be changed to nullable");
+            }
+        }
+    }
+
     /**
      * Register the given tuple id as being the invisible side of a semi-join.
      */
@@ -1360,6 +1421,10 @@ public class Analyzer {
         return globalState.fullOuterJoinedTupleIds.containsKey(tid);
     }
 
+    public boolean isOuterMaterializedJoined(TupleId tid) {
+        return globalState.outerJoinedMaterializedTupleIds.contains(tid);
+    }
+
     public boolean isFullOuterJoined(SlotId sid) {
         return isFullOuterJoined(getTupleId(sid));
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
index 569be69a98..ed7ba00dd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java
@@ -21,11 +21,9 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Table;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.thrift.TDescriptorTable;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
@@ -115,21 +113,6 @@ public class DescriptorTable {
         return tupleDescs.get(id);
     }
 
-    /**
-     * Return all tuple desc by idList.
-     */
-    public List<TupleDescriptor> getTupleDesc(List<TupleId> idList) throws AnalysisException {
-        List<TupleDescriptor> result = Lists.newArrayList();
-        for (TupleId tupleId : idList) {
-            TupleDescriptor tupleDescriptor = getTupleDesc(tupleId);
-            if (tupleDescriptor == null) {
-                throw new AnalysisException("Invalid tuple id:" + tupleId.toString());
-            }
-            result.add(tupleDescriptor);
-        }
-        return result;
-    }
-
     public SlotDescriptor getSlotDesc(SlotId id) {
         return slotDescs.get(id);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
index 4145ee4536..966cfa7e0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java
@@ -88,34 +88,13 @@ public final class ExprSubstitutionMap {
         return lhs.contains(lhsExpr);
     }
 
-    /**
-     * Returns lhs if the smap contains a mapping for rhsExpr.
-     */
-    public Expr mappingForRhsExpr(Expr rhsExpr) {
-        for (int i = 0; i < rhs.size(); ++i) {
-            if (rhs.get(i).equals(rhsExpr)) {
-                return lhs.get(i);
-            }
-        }
-        return null;
-    }
-
-    public void removeByRhsExpr(Expr rhsExpr) {
-        for (int i = 0; i < rhs.size(); ++i) {
-            if (rhs.get(i).equals(rhsExpr)) {
-                lhs.remove(i);
-                rhs.remove(i);
-                break;
-            }
-        }
-    }
-
     /**
      * Return a map  which is equivalent to applying f followed by g,
      * i.e., g(f()).
      * Always returns a non-null map.
      */
-    public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g, Analyzer analyzer) {
+    public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g,
+                                              Analyzer analyzer) {
         if (f == null && g == null) {
             return new ExprSubstitutionMap();
         }
@@ -151,61 +130,11 @@ public final class ExprSubstitutionMap {
         return result;
     }
 
-    /**
-     * Returns the subtraction of two substitution maps.
-     * f [A.id, B.id] g [A.id, C.id]
-     * return: g-f [B,id, C,id]
-     */
-    public static ExprSubstitutionMap subtraction(ExprSubstitutionMap f, ExprSubstitutionMap g) {
-        if (f == null && g == null) {
-            return new ExprSubstitutionMap();
-        }
-        if (f == null) {
-            return g;
-        }
-        if (g == null) {
-            return f;
-        }
-        ExprSubstitutionMap result = new ExprSubstitutionMap();
-        for (int i = 0; i < g.size(); i++) {
-            if (f.containsMappingFor(g.lhs.get(i))) {
-                result.put(f.get(g.lhs.get(i)), g.rhs.get(i));
-            } else {
-                result.put(g.lhs.get(i), g.rhs.get(i));
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Returns the replace of two substitution maps.
-     * f [A.id, B.id] [A.name, B.name] g [A.id, C.id] [A.age, C.age]
-     * return: [A.id, C,id] [A.name, B.name] [A.age, C.age]
-     */
-    public static ExprSubstitutionMap combineAndReplace(ExprSubstitutionMap f, ExprSubstitutionMap g) {
-        if (f == null && g == null) {
-            return new ExprSubstitutionMap();
-        }
-        if (f == null) {
-            return g;
-        }
-        if (g == null) {
-            return f;
-        }
-        ExprSubstitutionMap result = new ExprSubstitutionMap();
-        result = ExprSubstitutionMap.combine(result, g);
-        for (int i = 0; i < f.size(); i++) {
-            if (!result.containsMappingFor(f.lhs.get(i))) {
-                result.put(f.lhs.get(i), f.rhs.get(i));
-            }
-        }
-        return result;
-    }
-
     /**
      * Returns the union of two substitution maps. Always returns a non-null map.
      */
-    public static ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) {
+    public static ExprSubstitutionMap combine(ExprSubstitutionMap f,
+                                              ExprSubstitutionMap g) {
         if (f == null && g == null) {
             return new ExprSubstitutionMap();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index d6f413e89e..42a3225699 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -40,6 +40,7 @@ import org.apache.doris.common.TableAliasGenerator;
 import org.apache.doris.common.TreeNode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.SqlUtils;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.ExprRewriter;
@@ -512,6 +513,13 @@ public class SelectStmt extends QueryStmt {
             analyzer.registerConjuncts(whereClause, false, getTableRefIds());
         }
 
+        // Change all outer join tuple to null here after analyze where and from clause
+        // all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info
+        // the solt desc nullable mark must be corrected to make sure BE exec query right.
+        if (VectorizedUtil.isVectorized()) {
+            analyzer.changeAllOuterJoinTupleToNull();
+        }
+
         createSortInfo(analyzer);
         if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) {
             if (groupingInfo != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 6d8b9ddb01..107a9a3637 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -489,15 +489,20 @@ public class TableRef implements ParseNode, Writable {
         if (joinOp == JoinOperator.LEFT_OUTER_JOIN
                 || joinOp == JoinOperator.FULL_OUTER_JOIN) {
             analyzer.registerOuterJoinedTids(getId().asList(), this);
+            analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds());
         }
         if (joinOp == JoinOperator.RIGHT_OUTER_JOIN
                 || joinOp == JoinOperator.FULL_OUTER_JOIN) {
             analyzer.registerOuterJoinedTids(leftTblRef.getAllTableRefIds(), this);
+            analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds());
         }
         // register the tuple ids of a full outer join
         if (joinOp == JoinOperator.FULL_OUTER_JOIN) {
             analyzer.registerFullOuterJoinedTids(leftTblRef.getAllTableRefIds(), this);
             analyzer.registerFullOuterJoinedTids(getId().asList(), this);
+
+            analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds());
+            analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds());
         }
 
         // register the tuple id of the rhs of a left semi join
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
similarity index 53%
copy from fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
copy to fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
index 296ae5571b..2c5d12e7d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java
@@ -15,22 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common;
 
-import org.apache.doris.qe.ConnectContext;
-
-public class VectorizedUtil {
-    /**
-     * 1. Return false if there is no current connection (Rule1 to be changed)
-     * 2. Returns the vectorized switch value of the query 'globalState.enableQueryVec'
-     * 3. If it is not currently a query, return the vectorized switch value of the session 'enableVectorizedEngine'
-     * @return true: vec. false: non-vec
-     */
-    public static boolean isVectorized() {
-        ConnectContext connectContext = ConnectContext.get();
-        if (connectContext == null) {
-            return false;
-        }
-        return connectContext.getSessionVariable().enableVectorizedEngine();
+public class VecNotImplException extends UserException {
+    public VecNotImplException(String msg) {
+        super(msg);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index 296ae5571b..0eba9f9fc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -17,7 +17,12 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.VariableMgr;
 
 public class VectorizedUtil {
     /**
@@ -33,4 +38,34 @@ public class VectorizedUtil {
         }
         return connectContext.getSessionVariable().enableVectorizedEngine();
     }
+
+    /**
+     * The purpose of this function is to turn off the vectorization switch for the current query.
+     * When the vectorization engine cannot meet the requirements of the current query,
+     * it will convert the current query into a non-vectorized query.
+     * Note that this will only change the **vectorization switch for a single query**,
+     * and will not affect other queries in the same session.
+     * Therefore, even if the vectorization switch of the current query is turned off,
+     * the vectorization properties of subsequent queries will not be affected.
+     *
+     * Session: set enable_vectorized_engine=true;
+     * Query1: select * from table (vec)
+     * Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec)
+     * Query3: select * from table (still vec)
+     */
+    public static void switchToQueryNonVec() {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext == null) {
+            return;
+        }
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+        sessionVariable.setIsSingleSetVar(true);
+        try {
+            VariableMgr.setVar(sessionVariable, new SetVar(
+                    "enable_vectorized_engine",
+                    new StringLiteral("false")));
+        } catch (DdlException e) {
+            // do nothing
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index c8561b54dc..a83aedaa49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -25,7 +25,6 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.SlotId;
-import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
@@ -312,7 +311,7 @@ public class AggregationNode extends PlanNode {
     }
 
     @Override
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
+    public Set<SlotId> computeInputSlotIds() throws NotImplementedException {
         Set<SlotId> result = Sets.newHashSet();
         // compute group by slot
         ArrayList<Expr> groupingExprs = aggInfo.getGroupingExprs();
@@ -325,19 +324,6 @@ public class AggregationNode extends PlanNode {
         List<SlotId> aggregateSlotIds = Lists.newArrayList();
         Expr.getIds(aggregateExprs, null, aggregateSlotIds);
         result.addAll(aggregateSlotIds);
-
-        // case: select count(*) from test
-        // result is empty
-        // Actually need to take a column as the input column of the agg operator
-        if (result.isEmpty()) {
-            TupleDescriptor tupleDesc = analyzer.getTupleDesc(getChild(0).getOutputTupleIds().get(0));
-            // If the query result is empty set such as: select count(*) from table where 1=2
-            // then the materialized slot will be empty
-            // So the result should be empty also.
-            if (!tupleDesc.getMaterializedSlots().isEmpty()) {
-                result.add(tupleDesc.getMaterializedSlots().get(0).getId());
-            }
-        }
         return result;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 3332182069..9192c0981f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -29,12 +29,10 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TableRef;
-import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.ColumnStats;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CheckedMath;
 import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.Pair;
@@ -56,7 +54,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +84,6 @@ public class HashJoinNode extends PlanNode {
     private boolean isBucketShuffle = false; // the flag for bucket shuffle join
 
     private List<SlotId> hashOutputSlotIds;
-    private TupleDescriptor vOutputTupleDesc;
-    private ExprSubstitutionMap vSrcToOutputSMap;
 
     /**
      * Constructor of HashJoinNode.
@@ -254,100 +249,38 @@ public class HashJoinNode extends PlanNode {
      *
      * @param slotIdList
      */
-    private void initHashOutputSlotIds(List<SlotId> slotIdList, Analyzer analyzer) {
-        Set<SlotId> hashOutputSlotIdSet = Sets.newHashSet();
-        // step1: change output slot id to src slot id
-        if (vSrcToOutputSMap != null) {
-            for (SlotId slotId : slotIdList) {
-                SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
-                Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
-                if (srcExpr == null) {
-                    hashOutputSlotIdSet.add(slotId);
-                } else {
-                    List<SlotRef> srcSlotRefList = Lists.newArrayList();
-                    srcExpr.collect(SlotRef.class, srcSlotRefList);
-                    hashOutputSlotIdSet
-                            .addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList()));
-                }
-            }
-        }
-
-        // step2: add conjuncts required slots
+    private void initHashOutputSlotIds(List<SlotId> slotIdList) {
+        hashOutputSlotIds = new ArrayList<>(slotIdList);
         List<SlotId> otherAndConjunctSlotIds = Lists.newArrayList();
         Expr.getIds(otherJoinConjuncts, null, otherAndConjunctSlotIds);
         Expr.getIds(conjuncts, null, otherAndConjunctSlotIds);
-        hashOutputSlotIdSet.addAll(otherAndConjunctSlotIds);
-        hashOutputSlotIds = new ArrayList<>(hashOutputSlotIdSet);
+        for (SlotId slotId : otherAndConjunctSlotIds) {
+            if (!hashOutputSlotIds.contains(slotId)) {
+                hashOutputSlotIds.add(slotId);
+            }
+        }
     }
 
     @Override
     public void initOutputSlotIds(Set<SlotId> requiredSlotIdSet, Analyzer analyzer) {
         outputSlotIds = Lists.newArrayList();
-        List<TupleDescriptor> outputTupleDescList = Lists.newArrayList();
-        if (vOutputTupleDesc != null) {
-            outputTupleDescList.add(vOutputTupleDesc);
-        } else {
-            for (TupleId tupleId : tupleIds) {
-                outputTupleDescList.add(analyzer.getTupleDesc(tupleId));
-            }
-        }
-        for (TupleDescriptor tupleDescriptor : outputTupleDescList) {
-            for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
-                if (slotDescriptor.isMaterialized()
-                        && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) {
+        for (TupleId tupleId : tupleIds) {
+            for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) {
+                if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains(
+                        slotDescriptor.getId()))) {
                     outputSlotIds.add(slotDescriptor.getId());
                 }
             }
         }
-        initHashOutputSlotIds(outputSlotIds, analyzer);
-    }
-
-    @Override
-    public void projectOutputTuple() throws NotImplementedException {
-        if (vOutputTupleDesc == null) {
-            return;
-        }
-        if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) {
-            return;
-        }
-        Iterator<SlotDescriptor> iterator = vOutputTupleDesc.getSlots().iterator();
-        while (iterator.hasNext()) {
-            SlotDescriptor slotDescriptor = iterator.next();
-            boolean keep = false;
-            for (SlotId outputSlotId : outputSlotIds) {
-                if (slotDescriptor.getId().equals(outputSlotId)) {
-                    keep = true;
-                    break;
-                }
-            }
-            if (!keep) {
-                iterator.remove();
-                SlotRef slotRef = new SlotRef(slotDescriptor);
-                vSrcToOutputSMap.removeByRhsExpr(slotRef);
-            }
-        }
-        vOutputTupleDesc.computeStatAndMemLayout();
+        initHashOutputSlotIds(outputSlotIds);
     }
 
     // output slots + predicate slots = input slots
     @Override
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
-        Set<SlotId> result = Sets.newHashSet();
+    public Set<SlotId> computeInputSlotIds() throws NotImplementedException {
         Preconditions.checkState(outputSlotIds != null);
-        // step1: change output slot id to src slot id
-        if (vSrcToOutputSMap != null) {
-            for (SlotId slotId : outputSlotIds) {
-                SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
-                Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
-                if (srcExpr == null) {
-                    result.add(slotId);
-                } else {
-                    List<SlotRef> srcSlotRefList = Lists.newArrayList();
-                    srcExpr.collect(SlotRef.class, srcSlotRefList);
-                    result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList()));
-                }
-            }
-        }
+        Set<SlotId> result = Sets.newHashSet();
+        result.addAll(outputSlotIds);
         // eq conjunct
         List<SlotId> eqConjunctSlotIds = Lists.newArrayList();
         Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds);
@@ -374,109 +307,14 @@ public class HashJoinNode extends PlanNode {
 
         ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap();
         List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
-        eqJoinConjuncts =
-                newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity).collect(Collectors.toList());
+        eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity)
+                .collect(Collectors.toList());
         assignedConjuncts = analyzer.getAssignedConjuncts();
         otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
-
-        // Only for Vec: create new tuple for join result
-        if (VectorizedUtil.isVectorized()) {
-            computeOutputTuple(analyzer);
-        }
-    }
-
-    private void computeOutputTuple(Analyzer analyzer) throws AnalysisException {
-        // 1. create new tuple
-        vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
-        boolean copyLeft = false;
-        boolean copyRight = false;
-        boolean leftNullable = false;
-        boolean rightNullable = false;
-        switch (joinOp) {
-            case INNER_JOIN:
-            case CROSS_JOIN:
-                copyLeft = true;
-                copyRight = true;
-                break;
-            case LEFT_OUTER_JOIN:
-                copyLeft = true;
-                copyRight = true;
-                rightNullable = true;
-                break;
-            case RIGHT_OUTER_JOIN:
-                copyLeft = true;
-                copyRight = true;
-                leftNullable = true;
-                break;
-            case FULL_OUTER_JOIN:
-                copyLeft = true;
-                copyRight = true;
-                leftNullable = true;
-                rightNullable = true;
-                break;
-            case LEFT_ANTI_JOIN:
-            case LEFT_SEMI_JOIN:
-            case NULL_AWARE_LEFT_ANTI_JOIN:
-                copyLeft = true;
-                break;
-            case RIGHT_ANTI_JOIN:
-            case RIGHT_SEMI_JOIN:
-                copyRight = true;
-                break;
-            default:
-                break;
-        }
-        ExprSubstitutionMap srcTblRefToOutputTupleSmap = new ExprSubstitutionMap();
-        if (copyLeft) {
-            for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(getChild(0).getOutputTblRefIds())) {
-                for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) {
-                    if (!isMaterailizedByChild(leftSlotDesc, getChild(0).getOutputSmap())) {
-                        continue;
-                    }
-                    SlotDescriptor outputSlotDesc =
-                            analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc);
-                    if (leftNullable) {
-                        outputSlotDesc.setIsNullable(true);
-                    }
-                    srcTblRefToOutputTupleSmap.put(new SlotRef(leftSlotDesc), new SlotRef(outputSlotDesc));
-                }
-            }
-        }
-        if (copyRight) {
-            for (TupleDescriptor rightTupleDesc :
-                    analyzer.getDescTbl().getTupleDesc(getChild(1).getOutputTblRefIds())) {
-                for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) {
-                    if (!isMaterailizedByChild(rightSlotDesc, getChild(1).getOutputSmap())) {
-                        continue;
-                    }
-                    SlotDescriptor outputSlotDesc =
-                            analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc);
-                    if (rightNullable) {
-                        outputSlotDesc.setIsNullable(true);
-                    }
-                    srcTblRefToOutputTupleSmap.put(new SlotRef(rightSlotDesc), new SlotRef(outputSlotDesc));
-                }
-            }
-        }
-        // 2. compute srcToOutputMap
-        vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap);
-        for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
-            Preconditions.checkState(vSrcToOutputSMap.getRhs().get(i) instanceof SlotRef);
-            SlotRef rSlotRef = (SlotRef) vSrcToOutputSMap.getRhs().get(i);
-            if (vSrcToOutputSMap.getLhs().get(i) instanceof SlotRef) {
-                SlotRef lSlotRef = (SlotRef) vSrcToOutputSMap.getLhs().get(i);
-                rSlotRef.getDesc().setIsMaterialized(lSlotRef.getDesc().isMaterialized());
-            } else {
-                rSlotRef.getDesc().setIsMaterialized(true);
-            }
-        }
-        vOutputTupleDesc.computeStatAndMemLayout();
-        // 3. change the outputSmap
-        outputSmap = ExprSubstitutionMap.combineAndReplace(outputSmap, srcTblRefToOutputTupleSmap);
     }
 
     private void replaceOutputSmapForOuterJoin() {
-        if (joinOp.isOuterJoin() && !VectorizedUtil.isVectorized()) {
+        if (joinOp.isOuterJoin()) {
             List<Expr> lhs = new ArrayList<>();
             List<Expr> rhs = new ArrayList<>();
 
@@ -909,14 +747,6 @@ public class HashJoinNode extends PlanNode {
                 msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt());
             }
         }
-        if (vSrcToOutputSMap != null) {
-            for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
-                msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift());
-            }
-        }
-        if (vOutputTupleDesc != null) {
-            msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt());
-        }
     }
 
     @Override
@@ -951,9 +781,6 @@ public class HashJoinNode extends PlanNode {
         }
         output.append(detailPrefix).append(String.format("cardinality=%s", cardinality)).append("\n");
         // todo unify in plan node
-        if (vOutputTupleDesc != null) {
-            output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId());
-        }
         if (outputSlotIds != null) {
             output.append(detailPrefix).append("output slot ids: ");
             for (SlotId slotId : outputSlotIds) {
@@ -1003,72 +830,4 @@ public class HashJoinNode extends PlanNode {
         }
         super.convertToVectoriezd();
     }
-
-    /**
-     * If parent wants to get hash join node tupleids,
-     * it will call this function instead of read properties directly.
-     * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for hash join node.
-     *
-     * If you read the properties of @tupleids directly instead of this function,
-     * it reads the input id of the current node.
-     */
-    @Override
-    public ArrayList<TupleId> getTupleIds() {
-        Preconditions.checkState(tupleIds != null);
-        if (vOutputTupleDesc != null) {
-            return Lists.newArrayList(vOutputTupleDesc.getId());
-        }
-        return tupleIds;
-    }
-
-    @Override
-    public ArrayList<TupleId> getOutputTblRefIds() {
-        switch (joinOp) {
-            case LEFT_SEMI_JOIN:
-            case LEFT_ANTI_JOIN:
-            case NULL_AWARE_LEFT_ANTI_JOIN:
-                return getChild(0).getOutputTblRefIds();
-            case RIGHT_SEMI_JOIN:
-            case RIGHT_ANTI_JOIN:
-                return getChild(1).getOutputTblRefIds();
-            default:
-                return getTblRefIds();
-        }
-    }
-
-    @Override
-    public ArrayList<TupleId> getOutputTupleIds() {
-        if (vOutputTupleDesc != null) {
-            return Lists.newArrayList(vOutputTupleDesc.getId());
-        }
-        switch (joinOp) {
-            case LEFT_SEMI_JOIN:
-            case LEFT_ANTI_JOIN:
-            case NULL_AWARE_LEFT_ANTI_JOIN:
-                return getChild(0).getOutputTupleIds();
-            case RIGHT_SEMI_JOIN:
-            case RIGHT_ANTI_JOIN:
-                return getChild(1).getOutputTupleIds();
-            default:
-                return tupleIds;
-        }
-    }
-
-    private boolean isMaterailizedByChild(SlotDescriptor slotDesc, ExprSubstitutionMap smap) {
-        if (slotDesc.isMaterialized()) {
-            return true;
-        }
-        Expr child = smap.get(new SlotRef(slotDesc));
-        if (child == null) {
-            return false;
-        }
-        List<SlotRef> slotRefList = Lists.newArrayList();
-        child.collect(SlotRef.class, slotRefList);
-        for (SlotRef slotRef : slotRefList) {
-            if (slotRef.getDesc() != null && !slotRef.getDesc().isMaterialized()) {
-                return false;
-            }
-        }
-        return true;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 5f1d4293b0..466e69a53c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -894,6 +894,7 @@ public class OlapScanNode extends ScanNode {
             SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN);
             deleteSignSlot.analyze(analyzer);
             deleteSignSlot.getDesc().setIsMaterialized(true);
+            deleteSignSlot.getDesc().setIsNullable(analyzer.isOuterMaterializedJoined(desc.getId()));
             Expr conjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, deleteSignSlot, new IntLiteral(0));
             conjunct.analyze(analyzer);
             conjuncts.add(conjunct);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1d7c9b8273..2a030fbf7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -318,14 +318,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         tblRefIds = ids;
     }
 
-    public ArrayList<TupleId> getOutputTblRefIds() {
-        return tblRefIds;
-    }
-
-    public ArrayList<TupleId> getOutputTupleIds() {
-        return tupleIds;
-    }
-
     public Set<TupleId> getNullableTupleIds() {
         Preconditions.checkState(nullableTupleIds != null);
         return nullableTupleIds;
@@ -961,11 +953,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
         throw new NotImplementedException("The `initOutputSlotIds` hasn't been implemented in " + planNodeName);
     }
 
-    public void projectOutputTuple() throws NotImplementedException {
-        throw new NotImplementedException("The `projectOutputTuple` hasn't been implemented in " + planNodeName + ". "
-        + "But it does not affect the project optimizer");
-    }
-
     /**
      * If an plan node implements this method, its child plan node has the ability to implement the project.
      * The return value of this method will be used as
@@ -985,7 +972,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
      *         agg node
      *    (required slots: a.k1)
      */
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
+    public Set<SlotId> computeInputSlotIds() throws NotImplementedException {
         throw new NotImplementedException("The `computeInputSlotIds` hasn't been implemented in " + planNodeName);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java
index 643d9ae863..649c6d5270 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java
@@ -47,7 +47,6 @@ public class ProjectPlanner {
     public void projectPlanNode(Set<SlotId> outputSlotIds, PlanNode planNode) {
         try {
             planNode.initOutputSlotIds(outputSlotIds, analyzer);
-            planNode.projectOutputTuple();
         } catch (NotImplementedException e) {
             LOG.debug(e);
         }
@@ -56,7 +55,7 @@ public class ProjectPlanner {
         }
         Set<SlotId> inputSlotIds = null;
         try {
-            inputSlotIds = planNode.computeInputSlotIds(analyzer);
+            inputSlotIds = planNode.computeInputSlotIds();
         } catch (NotImplementedException e) {
             LOG.debug(e);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index 6e56f6ffd2..95a13061e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -347,7 +347,6 @@ public abstract class SetOperationNode extends PlanNode {
     @Override
     public void init(Analyzer analyzer) throws UserException {
         Preconditions.checkState(conjuncts.isEmpty());
-        createDefaultSmap(analyzer);
         computeTupleStatAndMemLayout(analyzer);
         computeStats(analyzer);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 32d0f3961d..38711d025f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1354,14 +1354,9 @@ public class SingleNodePlanner {
                 }
                 unionNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId()));
                 unionNode.addConstExprList(selectStmt.getBaseTblResultExprs());
-                unionNode.init(analyzer);
                 //set outputSmap to substitute literal in outputExpr
-                unionNode.setWithoutTupleIsNullOutputSmap(inlineViewRef.getSmap());
-                if (analyzer.isOuterJoined(inlineViewRef.getId())) {
-                    List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs(
-                            inlineViewRef.getSmap().getRhs(), unionNode.getTupleIds(), analyzer);
-                    unionNode.setOutputSmap(new ExprSubstitutionMap(inlineViewRef.getSmap().getLhs(), nullableRhs));
-                }
+                unionNode.setOutputSmap(inlineViewRef.getSmap());
+                unionNode.init(analyzer);
                 return unionNode;
             }
         }
@@ -1389,6 +1384,15 @@ public class SingleNodePlanner {
             List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs(
                     outputSmap.getRhs(), rootNode.getTupleIds(), analyzer);
             outputSmap = new ExprSubstitutionMap(outputSmap.getLhs(), nullableRhs);
+            // When we process outer join with inline views, we set slot descriptor of inline view to nullable firstly.
+            // When we generate plan, we remove inline view, so the upper node's input is inline view's child.
+            // So we need to set slot descriptor of inline view's child to nullable to ensure consistent behavior
+            // with BaseTable.
+            for (TupleId tupleId : rootNode.getTupleIds()) {
+                for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getMaterializedSlots()) {
+                    slotDescriptor.setIsNullable(true);
+                }
+            }
         }
         // Set output smap of rootNode *before* creating a SelectNode for proper resolution.
         rootNode.setOutputSmap(outputSmap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 09b783a8c5..2a7d3e7b29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -161,7 +161,7 @@ public class SortNode extends PlanNode {
     }
 
     @Override
-    public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedException {
+    public Set<SlotId> computeInputSlotIds() throws NotImplementedException {
         List<SlotId> result = Lists.newArrayList();
         Expr.getIds(resolvedTupleExprs, null, result);
         return new HashSet<>(result);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3ed8f26156..b50f77fb3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -70,6 +70,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
@@ -79,6 +80,7 @@ import org.apache.doris.common.util.QueryPlannerProfile;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
@@ -606,6 +608,13 @@ public class StmtExecutor implements ProfileWriter {
                     } else {
                         resetAnalyzerAndStmt();
                     }
+                } catch (VecNotImplException e) {
+                    if (i == analyzeTimes) {
+                        throw e;
+                    } else {
+                        resetAnalyzerAndStmt();
+                        VectorizedUtil.switchToQueryNonVec();
+                    }
                 } catch (UserException e) {
                     throw e;
                 } catch (Exception e) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java
index 269daa39ff..de0d525daa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.VecNotImplException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rewrite.FoldConstantsRule;
@@ -244,6 +245,13 @@ public class QueryStmtTest {
             constMap.clear();
             constMap = getConstantExprMap(exprsMap, analyzer);
             Assert.assertEquals(4, constMap.size());
+        } else {
+            try {
+                UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
+                Assert.fail();
+            } catch (VecNotImplException e) {
+                Assert.assertTrue(e.getMessage().contains("could not be changed to nullable"));
+            }
         }
     }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java
index 375afd5fef..0159edba6c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java
@@ -87,8 +87,8 @@ public class ProjectPlannerFunctionTest {
         String queryStr = "desc verbose select a.k2 from test.t1 a inner join test.t1 b on a.k1=b.k1 "
                 + "inner join test.t1 c on a.k1=c.k1;";
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
-        Assert.assertTrue(explainString.contains("output slot ids: 8"));
-        Assert.assertTrue(explainString.contains("output slot ids: 4 5"));
+        Assert.assertTrue(explainString.contains("output slot ids: 3"));
+        Assert.assertTrue(explainString.contains("output slot ids: 0 3"));
     }
 
     // keep a.k2 after a join b
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index a439072b39..8d9a115094 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1136,21 +1136,20 @@ public class QueryPlanTest extends TestWithFeService {
         Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE"));
 
         // support recurse of bucket shuffle join
-        // TODO: support the UT in the future
         queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2"
                 + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3"
                 + " on t2.k1 = t3.k1 and t2.k2 = t3.k2";
         explainString = getSQLPlanOrErrorMsg(queryStr);
-        // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
-        //  Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));
 
         // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name
         queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2"
                 + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3"
                 + " on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2";
         explainString = getSQLPlanOrErrorMsg(queryStr);
-        //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
-        //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
+        Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
 
         // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join
         queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2"
@@ -1183,9 +1182,6 @@ public class QueryPlanTest extends TestWithFeService {
             }
         }
 
-        // disable bucket shuffle join
-        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
-
         String queryStr = "explain select * from mysql_table t2, jointest t1 where t1.k1 = t2.k1";
         String explainString = getSQLPlanOrErrorMsg(queryStr);
         Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)"));
@@ -1233,8 +1229,6 @@ public class QueryPlanTest extends TestWithFeService {
             }
         }
 
-        // disable bucket shuffle join
-        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
         String queryStr = "explain select * from odbc_mysql t2, jointest t1 where t1.k1 = t2.k1";
         String explainString = getSQLPlanOrErrorMsg(queryStr);
         Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)"));
@@ -1329,9 +1323,7 @@ public class QueryPlanTest extends TestWithFeService {
     @Test
     public void testPreferBroadcastJoin() throws Exception {
         connectContext.setDatabase("default_cluster:test");
-        String queryStr = "explain select * from (select k2 from jointest)t2, jointest t1 where t1.k1 = t2.k2";
-        // disable bucket shuffle join
-        Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
+        String queryStr = "explain select * from (select k2 from jointest group by k2)t2, jointest t1 where t1.k1 = t2.k2";
 
         // default set PreferBroadcastJoin true
         String explainString = getSQLPlanOrErrorMsg(queryStr);
@@ -1597,31 +1589,32 @@ public class QueryPlanTest extends TestWithFeService {
         //valid date
         String sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a right outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)";
         String explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>"));
+        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`a`.`aid` | 4"));
 
         sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a left outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>"));
+        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:3 | `b`.`bid`"));
 
         sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a full outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>"));
+        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:`a`.`aid` | `b`.`bid`"));
 
         sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:<slot 2> | <slot 3>"));
+        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:3 | 4"));
 
         sql = "SELECT a.k1, b.k2 FROM (SELECT k1 from baseall) a LEFT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>"));
+        Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)"));
 
         sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a RIGHT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>"));
+        Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)"));
 
         sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a FULL JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
         explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
-        Assert.assertTrue(explainString.contains("<slot 5> | <slot 7>"));
+        Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)"));
+        Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)"));
     }
 
     @Test
@@ -2070,7 +2063,7 @@ public class QueryPlanTest extends TestWithFeService {
         String explainString = getSQLPlanOrErrorMsg(queryStr);
         Assert.assertFalse(explainString.contains("OUTPUT EXPRS:3 | 4"));
         System.out.println(explainString);
-        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:CAST(<slot 4> AS INT) | CAST(<slot 5> AS INT)"));
+        Assert.assertTrue(explainString.contains("OUTPUT EXPRS:CAST(`a`.`aid` AS INT) | 4"));
     }
 
     @Test
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 06d6e26c3b..fb3a87fc4a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -409,10 +409,6 @@ struct THashJoinNode {
 
   // hash output column
   6: optional list<Types.TSlotId> hash_output_slot_ids
-
-  7: optional list<Exprs.TExpr> srcExprList
-
-  8: optional Types.TTupleId voutput_tuple_id
 }
 
 struct TMergeJoinNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org