You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/23 02:56:49 UTC

[doris] branch branch-1.2-lts updated: [feature](planner) mark join to support subquery in disjunction (#14579) (#15291)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new fd51474649 [feature](planner) mark join to support subquery in disjunction (#14579) (#15291)
fd51474649 is described below

commit fd514746494360790ecd11000deaf979ce8f759b
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Dec 23 10:56:42 2022 +0800

    [feature](planner) mark join to support subquery in disjunction (#14579) (#15291)
    
    cherry-pick #14579
---
 be/src/vec/exec/join/vjoin_node_base.cpp           |  16 ++
 be/src/vec/exec/join/vjoin_node_base.h             |   1 +
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |  86 ++++++--
 be/src/vec/exec/join/vnested_loop_join_node.h      |  12 +-
 .../org/apache/doris/analysis/AnalyticExpr.java    |   5 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |  46 +++-
 .../main/java/org/apache/doris/analysis/Expr.java  |  26 ++-
 .../apache/doris/analysis/FunctionCallExpr.java    |   5 +-
 .../org/apache/doris/analysis/JoinOperator.java    |   8 +-
 .../java/org/apache/doris/analysis/SelectStmt.java |   3 +
 .../org/apache/doris/analysis/StmtRewriter.java    | 244 ++++++++++++++-------
 .../java/org/apache/doris/analysis/TableRef.java   |  25 ++-
 .../org/apache/doris/planner/JoinNodeBase.java     |  12 +-
 .../apache/doris/planner/NestedLoopJoinNode.java   |   1 +
 .../apache/doris/planner/SingleNodePlanner.java    |  16 +-
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 .../correctness/test_subquery_in_disjunction.out   |  26 +++
 .../test_subquery_in_disjunction.groovy            |  83 +++++++
 18 files changed, 486 insertions(+), 131 deletions(-)

diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp
index 2ebedff219..9befa4f4bb 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -48,8 +48,19 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
                              _join_op == TJoinOp::LEFT_SEMI_JOIN ||
                              _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN),
           _is_outer_join(_match_all_build || _match_all_probe),
+          _is_mark_join(tnode.__isset.nested_loop_join_node
+                                ? (tnode.nested_loop_join_node.__isset.is_mark
+                                           ? tnode.nested_loop_join_node.is_mark
+                                           : false)
+                                : false),
           _short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     _init_join_op();
+    if (_is_mark_join) {
+        DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN ||
+               _join_op == TJoinOp::CROSS_JOIN)
+                << "Mark join is only supported for left semi/anti join and cross join but this is "
+                << _join_op;
+    }
     if (tnode.__isset.hash_join_node) {
         _output_row_desc.reset(
                 new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
@@ -83,6 +94,11 @@ void VJoinNodeBase::_construct_mutable_join_block() {
             _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()});
         }
     }
+    if (_is_mark_join) {
+        _join_block.replace_by_position(
+                _join_block.columns() - 1,
+                remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
+    }
 }
 
 Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) {
diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h
index f2bdc6eced..5c73e0a0c6 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -87,6 +87,7 @@ protected:
     const bool _is_right_semi_anti;
     const bool _is_left_semi_anti;
     const bool _is_outer_join;
+    const bool _is_mark_join;
 
     // For null aware left anti join, we apply a short circuit strategy.
     // 1. Set _short_circuit_for_null_in_build_side to true if join operator is null aware left anti join.
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 2d3a954903..7c3f29c854 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -225,7 +225,6 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
 
     _join_block.clear_column_data();
     MutableBlock mutable_join_block(&_join_block);
-    auto& dst_columns = mutable_join_block.mutable_columns();
 
     std::stack<uint16_t> offset_stack;
     RETURN_IF_ERROR(std::visit(
@@ -251,7 +250,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
                             if constexpr (set_build_side_flag) {
                                 offset_stack.push(mutable_join_block.rows());
                             }
-                            _process_left_child_block(dst_columns, now_process_build_block);
+                            _process_left_child_block(mutable_join_block, now_process_build_block);
                         } while (mutable_join_block.rows() < state->batch_size() &&
                                  _current_build_pos < _build_blocks.size());
                     }
@@ -261,7 +260,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
                         Status status = _do_filtering_and_update_visited_flags<set_build_side_flag,
                                                                                set_probe_side_flag>(
                                 &tmp_block, offset_stack, !_is_left_semi_anti);
-                        _update_tuple_is_null_column(&tmp_block);
+                        _update_additional_flags(&tmp_block);
                         if (!status.OK()) {
                             return status;
                         }
@@ -274,15 +273,21 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
                             if (!_matched_rows_done) {
                                 _finalize_current_phase<false, JoinOpType::value ==
                                                                        TJoinOp::LEFT_SEMI_JOIN>(
-                                        dst_columns, state->batch_size());
-                                _reset_with_next_probe_row(dst_columns);
+                                        mutable_join_block, state->batch_size());
+                                _reset_with_next_probe_row();
                             }
                             break;
                         }
                     }
 
                     if (!_matched_rows_done && _current_build_pos == _build_blocks.size()) {
-                        _reset_with_next_probe_row(dst_columns);
+                        if (_is_mark_join && _build_blocks.empty()) {
+                            DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN);
+                            _append_left_data_with_null(mutable_join_block);
+                            _reset_with_next_probe_row();
+                            break;
+                        }
+                        _reset_with_next_probe_row();
                     }
                 }
                 if constexpr (!set_probe_side_flag) {
@@ -290,7 +295,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
                     Status status = _do_filtering_and_update_visited_flags<set_build_side_flag,
                                                                            set_probe_side_flag>(
                             &tmp_block, offset_stack, !_is_right_semi_anti);
-                    _update_tuple_is_null_column(&tmp_block);
+                    _update_additional_flags(&tmp_block);
                     mutable_join_block = MutableBlock(std::move(tmp_block));
                     if (!status.OK()) {
                         return status;
@@ -299,10 +304,9 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
 
                 if constexpr (set_build_side_flag) {
                     if (_matched_rows_done && _output_null_idx_build_side < _build_blocks.size()) {
-                        auto& cols = mutable_join_block.mutable_columns();
                         _finalize_current_phase<true,
                                                 JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN>(
-                                cols, state->batch_size());
+                                mutable_join_block, state->batch_size());
                     }
                 }
                 return Status::OK();
@@ -328,8 +332,37 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool* eo
     return Status::OK();
 }
 
-void VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns,
+void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_block) const {
+    auto& dst_columns = mutable_block.mutable_columns();
+    DCHECK(_is_mark_join);
+    for (size_t i = 0; i < _num_probe_side_columns; ++i) {
+        const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
+        if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
+            auto origin_sz = dst_columns[i]->size();
+            DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
+            assert_cast<ColumnNullable*>(dst_columns[i].get())
+                    ->get_nested_column_ptr()
+                    ->insert_many_from(*src_column.column, _left_block_pos, 1);
+            assert_cast<ColumnNullable*>(dst_columns[i].get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .resize_fill(origin_sz + 1, 0);
+        } else {
+            dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1);
+        }
+    }
+    for (size_t i = 0; i < _num_build_side_columns; ++i) {
+        dst_columns[_num_probe_side_columns + i]->insert_default();
+    }
+    IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
+                                         *dst_columns[dst_columns.size() - 1])
+                                         .get_data();
+    mark_data.resize_fill(mark_data.size() + 1, 0);
+}
+
+void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
                                                     const Block& now_process_build_block) const {
+    auto& dst_columns = mutable_block.mutable_columns();
     const int max_added_rows = now_process_build_block.rows();
     for (size_t i = 0; i < _num_probe_side_columns; ++i) {
         const ColumnWithTypeAndName& src_column = _left_block.get_by_position(i);
@@ -367,7 +400,7 @@ void VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns,
     }
 }
 
-void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) {
+void VNestedLoopJoinNode::_update_additional_flags(Block* block) {
     if (_is_outer_join) {
         auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
         auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
@@ -383,6 +416,15 @@ void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) {
             right_null_map.get_data().resize_fill(block->rows(), 0);
         }
     }
+    if (_is_mark_join) {
+        IColumn::Filter& mark_data =
+                assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
+                        *block->get_by_position(block->columns() - 1).column->assume_mutable())
+                        .get_data();
+        if (mark_data.size() < block->rows()) {
+            mark_data.resize_fill(block->rows(), 1);
+        }
+    }
 }
 
 void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) {
@@ -396,10 +438,12 @@ void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) {
 }
 
 template <bool BuildSide, bool IsSemi>
-void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, size_t batch_size) {
+void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, size_t batch_size) {
+    auto& dst_columns = mutable_block.mutable_columns();
     DCHECK_GT(dst_columns.size(), 0);
     auto pre_size = dst_columns[0]->size();
     if constexpr (BuildSide) {
+        DCHECK(!_is_mark_join);
         auto build_block_sz = _build_blocks.size();
         size_t i = _output_null_idx_build_side;
         for (; i < build_block_sz; i++) {
@@ -470,15 +514,26 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
         _output_null_idx_build_side = i;
     } else {
         if constexpr (IsSemi) {
-            if (!_cur_probe_row_visited_flags) {
+            if (!_cur_probe_row_visited_flags && !_is_mark_join) {
                 return;
             }
         } else {
-            if (_cur_probe_row_visited_flags) {
+            if (_cur_probe_row_visited_flags && !_is_mark_join) {
                 return;
             }
         }
 
+        if (_is_mark_join) {
+            IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
+                                                 *dst_columns[dst_columns.size() - 1])
+                                                 .get_data();
+            mark_data.resize_fill(mark_data.size() + 1,
+                                  (IsSemi && !_cur_probe_row_visited_flags) ||
+                                                  (!IsSemi && _cur_probe_row_visited_flags)
+                                          ? 0
+                                          : 1);
+        }
+
         DCHECK_LT(_left_block_pos, _left_block.rows());
         for (size_t i = 0; i < _num_probe_side_columns; ++i) {
             const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
@@ -511,7 +566,8 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
     }
 }
 
-void VNestedLoopJoinNode::_reset_with_next_probe_row(MutableColumns& dst_columns) {
+void VNestedLoopJoinNode::_reset_with_next_probe_row() {
+    // TODO: need a vector of left block to register the _probe_row_visited_flags
     _cur_probe_row_visited_flags = false;
     _current_build_pos = 0;
     _left_block_pos++;
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h
index 45644f9e8e..aa63728cce 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -67,7 +67,7 @@ private:
     // Processes a block from the left child.
     //  dst_columns: left_child_row and now_process_build_block to construct a bundle column of new block
     //  now_process_build_block: right child block now to process
-    void _process_left_child_block(MutableColumns& dst_columns,
+    void _process_left_child_block(MutableBlock& mutable_block,
                                    const Block& now_process_build_block) const;
 
     template <bool SetBuildSideFlag, bool SetProbeSideFlag>
@@ -75,19 +75,23 @@ private:
                                                   bool materialize);
 
     template <bool BuildSide, bool IsSemi>
-    void _finalize_current_phase(MutableColumns& dst_columns, size_t batch_size);
+    void _finalize_current_phase(MutableBlock& mutable_block, size_t batch_size);
 
-    void _reset_with_next_probe_row(MutableColumns& dst_columns);
+    void _reset_with_next_probe_row();
 
     void _release_mem();
 
     Status get_left_side(RuntimeState* state, Block* block);
 
     // add tuple is null flag column to Block for filter conjunct and output expr
-    void _update_tuple_is_null_column(Block* block);
+    void _update_additional_flags(Block* block);
 
     void _add_tuple_is_null_column(Block* block) override;
 
+    // For mark join, if the relation from right side is empty, we should construct intermediate
+    // block with data from left side and filled with null for right side
+    void _append_left_data_with_null(MutableBlock& mutable_block) const;
+
     // List of build blocks, constructed in prepare()
     Blocks _build_blocks;
     // Visited flags for each row in build side.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
index 36895d8111..a9598a11f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyticExpr.java
@@ -910,9 +910,8 @@ public class AnalyticExpr extends Expr {
     }
 
     @Override
-    protected Expr substituteImpl(ExprSubstitutionMap sMap, Analyzer analyzer)
-            throws AnalysisException {
-        Expr e = super.substituteImpl(sMap, analyzer);
+    protected Expr substituteImpl(ExprSubstitutionMap sMap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) {
+        Expr e = super.substituteImpl(sMap, disjunctsMap, analyzer);
         if (!(e instanceof AnalyticExpr)) {
             return e;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 85289d9044..9c5b2e0be1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -389,6 +389,10 @@ public class Analyzer {
 
         private final Map<SlotId, SlotId> equivalentSlots = Maps.newHashMap();
 
+        private final Map<String, TupleDescriptor> markTuples = Maps.newHashMap();
+
+        private final Map<TableRef, TupleId> markTupleIdByInnerRef = Maps.newHashMap();
+
         public GlobalState(Env env, ConnectContext context) {
             this.env = env;
             this.context = context;
@@ -649,6 +653,14 @@ public class Analyzer {
 
         tableRefMap.put(result.getId(), ref);
 
+        // for mark join
+        if (ref.getJoinOp() != null && ref.isMark()) {
+            TupleDescriptor markTuple = getDescTbl().createTupleDescriptor();
+            markTuple.setAliases(new String[]{ref.getMarkTupleName()}, true);
+            globalState.markTuples.put(ref.getMarkTupleName(), markTuple);
+            globalState.markTupleIdByInnerRef.put(ref, markTuple.getId());
+        }
+
         return result;
     }
 
@@ -865,7 +877,7 @@ public class Analyzer {
                                                 newTblName == null ? "table list" : newTblName.toString());
         }
 
-        Column col = d.getTable().getColumn(colName);
+        Column col = d.getTable() == null ? new Column(colName, ScalarType.BOOLEAN) : d.getTable().getColumn(colName);
         if (col == null) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, colName,
                                                 newTblName == null ? d.getTable().getName() : newTblName.toString());
@@ -937,7 +949,7 @@ public class Analyzer {
             }
         }
 
-        return result;
+        return result != null ? result : globalState.markTuples.get(tblName.toString());
     }
 
     private TupleDescriptor resolveColumnRef(String colName) throws AnalysisException {
@@ -1517,6 +1529,36 @@ public class Analyzer {
         return result;
     }
 
+    public List<Expr> getMarkConjuncts(TableRef ref) {
+        TupleId id = globalState.markTupleIdByInnerRef.get(ref);
+        if (id == null) {
+            return Collections.emptyList();
+        }
+        return getAllConjuncts(id);
+    }
+
+    public TupleDescriptor getMarkTuple(TableRef ref) {
+        TupleDescriptor markTuple = globalState.descTbl.getTupleDesc(globalState.markTupleIdByInnerRef.get(ref));
+        if (markTuple != null) {
+            markTuple.setIsMaterialized(true);
+            markTuple.getSlots().forEach(s -> s.setIsMaterialized(true));
+        }
+        return markTuple;
+    }
+
+    public List<Expr> getMarkConjuncts() {
+        List<Expr> exprs = Lists.newArrayList();
+        List<TupleId> markIds = Lists.newArrayList(globalState.markTupleIdByInnerRef.values());
+        for (Expr e : globalState.conjuncts.values()) {
+            List<TupleId> tupleIds = Lists.newArrayList();
+            e.getIds(tupleIds, null);
+            if (!Collections.disjoint(markIds, tupleIds)) {
+                exprs.add(e);
+            }
+        }
+        return exprs;
+    }
+
     /**
      * Get all predicates belonging to one or more tuples that have not yet been assigned
      * Since these predicates will be assigned by upper-level plan nodes in the future,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index e92bc720cf..79df8f3395 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -691,12 +691,17 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
      */
     public Expr trySubstitute(ExprSubstitutionMap smap, Analyzer analyzer,
                               boolean preserveRootType) throws AnalysisException {
+        return trySubstitute(smap, null, analyzer, preserveRootType);
+    }
+
+    public Expr trySubstitute(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer,
+            boolean preserveRootType) throws AnalysisException {
         Expr result = clone();
         // Return clone to avoid removing casts.
         if (smap == null) {
             return result;
         }
-        result = result.substituteImpl(smap, analyzer);
+        result = result.substituteImpl(smap, disjunctsMap, analyzer);
         result.analyze(analyzer);
         if (preserveRootType && !type.equals(result.getType())) {
             result = result.castTo(type);
@@ -717,8 +722,14 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
      */
     public Expr substitute(ExprSubstitutionMap smap, Analyzer analyzer, boolean preserveRootType)
             throws AnalysisException {
+        return substitute(smap, null, analyzer, preserveRootType);
+    }
+
+    public Expr substitute(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap,
+            Analyzer analyzer, boolean preserveRootType)
+            throws AnalysisException {
         try {
-            return trySubstitute(smap, analyzer, preserveRootType);
+            return trySubstitute(smap, disjunctsMap, analyzer, preserveRootType);
         } catch (AnalysisException e) {
             throw e;
         } catch (Exception e) {
@@ -755,10 +766,9 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
      * Exprs that have non-child exprs which should be affected by substitutions must
      * override this method and apply the substitution to such exprs as well.
      */
-    protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
-            throws AnalysisException {
+    protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) {
         if (isImplicitCast()) {
-            return getChild(0).substituteImpl(smap, analyzer);
+            return getChild(0).substituteImpl(smap, disjunctsMap, analyzer);
         }
         if (smap != null) {
             Expr substExpr = smap.get(this);
@@ -766,8 +776,12 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                 return substExpr.clone();
             }
         }
+        if (Expr.IS_OR_PREDICATE.apply(this) && disjunctsMap != null) {
+            smap = disjunctsMap;
+            disjunctsMap = null;
+        }
         for (int i = 0; i < children.size(); ++i) {
-            children.set(i, children.get(i).substituteImpl(smap, analyzer));
+            children.set(i, children.get(i).substituteImpl(smap, disjunctsMap, analyzer));
         }
         // SlotRefs must remain analyzed to support substitution across query blocks. All
         // other exprs must be analyzed again after the substitution to add implicit casts
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
index 937aeb7437..d20626e4c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java
@@ -377,8 +377,7 @@ public class FunctionCallExpr extends Expr {
     }
 
     @Override
-    protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
-            throws AnalysisException {
+    protected Expr substituteImpl(ExprSubstitutionMap smap, ExprSubstitutionMap disjunctsMap, Analyzer analyzer) {
         if (aggFnParams != null && aggFnParams.exprs() != null) {
             ArrayList<Expr> newParams = new ArrayList<Expr>();
             for (Expr expr : aggFnParams.exprs()) {
@@ -392,7 +391,7 @@ public class FunctionCallExpr extends Expr {
             aggFnParams = aggFnParams
                     .clone(newParams);
         }
-        return super.substituteImpl(smap, analyzer);
+        return super.substituteImpl(smap, disjunctsMap, analyzer);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
index a01739a78e..de33961db1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
@@ -37,12 +37,12 @@ public enum JoinOperator {
     // NOT IN subqueries. It can have a single equality join conjunct
     // that returns TRUE when the rhs is NULL.
     NULL_AWARE_LEFT_ANTI_JOIN("NULL AWARE LEFT ANTI JOIN",
-        TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN);
+            TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN);
 
     private final String  description;
     private final TJoinOp thriftJoinOp;
 
-    private JoinOperator(String description, TJoinOp thriftJoinOp) {
+    JoinOperator(String description, TJoinOp thriftJoinOp) {
         this.description = description;
         this.thriftJoinOp = thriftJoinOp;
     }
@@ -72,11 +72,11 @@ public enum JoinOperator {
     }
 
     public boolean isLeftSemiJoin() {
-        return this == LEFT_SEMI_JOIN;
+        return this.thriftJoinOp == TJoinOp.LEFT_SEMI_JOIN;
     }
 
     public boolean isInnerJoin() {
-        return this == INNER_JOIN;
+        return this.thriftJoinOp == TJoinOp.INNER_JOIN;
     }
 
     public boolean isAntiJoin() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 36ea44680a..26a0eb5e8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -711,6 +711,9 @@ public class SelectStmt extends QueryStmt {
         List<Expr> baseTblJoinConjuncts =
                 Expr.trySubstituteList(unassignedJoinConjuncts, baseTblSmap, analyzer, false);
         analyzer.materializeSlots(baseTblJoinConjuncts);
+        List<Expr> markConjuncts = analyzer.getMarkConjuncts();
+        markConjuncts = Expr.trySubstituteList(markConjuncts, baseTblSmap, analyzer, false);
+        analyzer.materializeSlots(markConjuncts);
 
         if (evaluateOrderBy) {
             // mark ordering exprs before marking agg/analytic exprs because they could contain
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index 92be4011e8..f837bf6af5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -20,8 +20,10 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
@@ -106,11 +108,13 @@ public class StmtRewriter {
         if (result.hasWhereClause()) {
             // Push negation to leaf operands.
             result.whereClause = Expr.pushNegationToOperands(result.whereClause);
-            // Check if we can equal the subqueries in the WHERE clause. OR predicates with
-            // subqueries are not supported.
-            if (hasSubqueryInDisjunction(result.whereClause)) {
-                throw new AnalysisException("Subqueries in OR predicates are not supported: "
-                        + result.whereClause.toSql());
+            if (ConnectContext.get() == null || !ConnectContext.get().getSessionVariable().enableVectorizedEngine()) {
+                // Check if we can equal the subqueries in the WHERE clause. OR predicates with
+                // subqueries are not supported.
+                if (hasSubqueryInDisjunction(result.whereClause)) {
+                    throw new AnalysisException("Subqueries in OR predicates are not supported: "
+                            + result.whereClause.toSql());
+                }
             }
             rewriteWhereClauseSubqueries(result, analyzer);
         }
@@ -355,6 +359,24 @@ public class StmtRewriter {
         return false;
     }
 
+    private static void extractExprWithSubquery(boolean inDisjunct, Expr expr,
+            List<Expr> subqueryExprInConjunct, List<Expr> subqueryExprInDisjunct) {
+        if (!(expr instanceof CompoundPredicate)) {
+            if (expr.contains(Subquery.class)) {
+                if (inDisjunct) {
+                    subqueryExprInDisjunct.add(expr);
+                } else {
+                    subqueryExprInConjunct.add(expr);
+                }
+            }
+        } else {
+            for (Expr child : expr.getChildren()) {
+                extractExprWithSubquery(inDisjunct || Expr.IS_OR_PREDICATE.apply(expr), child,
+                        subqueryExprInConjunct, subqueryExprInDisjunct);
+            }
+        }
+    }
+
     /**
      * Rewrite all subqueries of a stmt's WHERE clause. Initially, all the
      * conjuncts containing subqueries are extracted from the WHERE clause and are
@@ -408,59 +430,42 @@ public class StmtRewriter {
      * ON $a$1.a = T1.a
      * WHERE T1.c < 10;
      */
-    private static void rewriteWhereClauseSubqueries(
-            SelectStmt stmt, Analyzer analyzer)
+    // TODO(mark join) need support mark join
+    private static void rewriteWhereClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
             throws AnalysisException {
         int numTableRefs = stmt.fromClause.size();
-        ArrayList<Expr> exprsWithSubqueries = Lists.newArrayList();
-        ExprSubstitutionMap smap = new ExprSubstitutionMap();
+        ArrayList<Expr> exprsWithSubqueriesInConjuncts = Lists.newArrayList();
+        ArrayList<Expr> exprsWithSubqueriesInDisjuncts = Lists.newArrayList();
+        ExprSubstitutionMap conjunctsSmap = new ExprSubstitutionMap();
+        ExprSubstitutionMap disjunctsSmap = new ExprSubstitutionMap();
+        List<TupleDescriptor> markTuples = Lists.newArrayList();
+        List<Expr> subqueryInConjunct = Lists.newArrayList();
+        List<Expr> subqueryInDisjunct = Lists.newArrayList();
         // Check if all the conjuncts in the WHERE clause that contain subqueries
         // can currently be rewritten as a join.
-        for (Expr conjunct : stmt.whereClause.getConjuncts()) {
-            List<Subquery> subqueries = Lists.newArrayList();
-            conjunct.collectAll(Predicates.instanceOf(Subquery.class), subqueries);
-            if (subqueries.size() == 0) {
-                continue;
-            }
-            if (subqueries.size() > 1) {
-                throw new AnalysisException("Multiple subqueries are not supported in "
-                        + "expression: " + conjunct.toSql());
-            }
-            if (!(conjunct instanceof InPredicate)
-                    && !(conjunct instanceof ExistsPredicate)
-                    && !(conjunct instanceof BinaryPredicate)
-                    && !conjunct.contains(Expr.IS_SCALAR_SUBQUERY)) {
-                throw new AnalysisException("Non-scalar subquery is not supported in "
-                        + "expression: "
-                        + conjunct.toSql());
-            }
-
-            if (conjunct instanceof ExistsPredicate) {
-                // Check if we can determine the result of an ExistsPredicate during analysis.
-                // If so, replace the predicate with a BoolLiteral predicate and remove it from
-                // the list of predicates to be rewritten.
-                BoolLiteral boolLiteral = replaceExistsPredicate((ExistsPredicate) conjunct);
-                if (boolLiteral != null) {
-                    boolLiteral.analyze(analyzer);
-                    smap.put(conjunct, boolLiteral);
-                    continue;
-                }
-            }
-
-            // Replace all the supported exprs with subqueries with true BoolLiterals
-            // using an smap.
-            BoolLiteral boolLiteral = new BoolLiteral(true);
-            boolLiteral.analyze(analyzer);
-            smap.put(conjunct, boolLiteral);
-            exprsWithSubqueries.add(conjunct);
+        // TODO(mark join) traverse expr tree to process subquery.
+        extractExprWithSubquery(false, stmt.whereClause, subqueryInConjunct, subqueryInDisjunct);
+        for (Expr conjunct : subqueryInConjunct) {
+            processOneSubquery(stmt, exprsWithSubqueriesInConjuncts,
+                    conjunctsSmap, markTuples, conjunct, analyzer, false);
         }
-        stmt.whereClause = stmt.whereClause.substitute(smap, analyzer, false);
+        for (Expr conjunct : subqueryInDisjunct) {
+            processOneSubquery(stmt, exprsWithSubqueriesInDisjuncts,
+                    disjunctsSmap, markTuples, conjunct, analyzer, true);
+        }
+        stmt.whereClause = stmt.whereClause.substitute(conjunctsSmap, disjunctsSmap, analyzer, false);
 
         boolean hasNewVisibleTuple = false;
         // Recursively equal all the exprs that contain subqueries and merge them
         // with 'stmt'.
-        for (Expr expr : exprsWithSubqueries) {
-            if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer)) {
+        for (Expr expr : exprsWithSubqueriesInConjuncts) {
+            if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, null)) {
+                hasNewVisibleTuple = true;
+            }
+        }
+        for (int i = 0; i < exprsWithSubqueriesInDisjuncts.size(); i++) {
+            Expr expr = exprsWithSubqueriesInDisjuncts.get(i);
+            if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, markTuples.get(i))) {
                 hasNewVisibleTuple = true;
             }
         }
@@ -472,6 +477,65 @@ public class StmtRewriter {
         }
     }
 
+    private static void processOneSubquery(SelectStmt stmt,
+            List<Expr> exprsWithSubqueries, ExprSubstitutionMap smap, List<TupleDescriptor> markTuples,
+            Expr exprWithSubquery, Analyzer analyzer, boolean isMark) throws AnalysisException {
+        List<Subquery> subqueries = Lists.newArrayList();
+        exprWithSubquery.collectAll(Predicates.instanceOf(Subquery.class), subqueries);
+        if (subqueries.size() == 0) {
+            return;
+        }
+        if (subqueries.size() > 1) {
+            throw new AnalysisException("Multiple subqueries are not supported in "
+                    + "expression: " + exprWithSubquery.toSql());
+        }
+        if (!(exprWithSubquery instanceof InPredicate)
+                && !(exprWithSubquery instanceof ExistsPredicate)
+                && !(exprWithSubquery instanceof BinaryPredicate)
+                && !exprWithSubquery.contains(Expr.IS_SCALAR_SUBQUERY)) {
+            throw new AnalysisException("Non-scalar subquery is not supported in "
+                    + "expression: "
+                    + exprWithSubquery.toSql());
+        }
+
+        if (exprWithSubquery instanceof ExistsPredicate) {
+            // Check if we can determine the result of an ExistsPredicate during analysis.
+            // If so, replace the predicate with a BoolLiteral predicate and remove it from
+            // the list of predicates to be rewritten.
+            BoolLiteral boolLiteral = replaceExistsPredicate((ExistsPredicate) exprWithSubquery);
+            if (boolLiteral != null) {
+                boolLiteral.analyze(analyzer);
+                smap.put(exprWithSubquery, boolLiteral);
+                return;
+            }
+        }
+
+        // Replace all the supported exprs with subqueries with true BoolLiterals
+        // using a smap.
+        if (isMark) {
+            // TODO(mark join) if need mark join, we should replace a SlotRef instead of BoolLiteral
+            TupleDescriptor markTuple = analyzer.getDescTbl().createTupleDescriptor();
+            markTuple.setAliases(new String[]{stmt.getTableAliasGenerator().getNextAlias()}, true);
+            SlotDescriptor markSlot = analyzer.addSlotDescriptor(markTuple);
+            String slotName = stmt.getColumnAliasGenerator().getNextAlias();
+            markSlot.setType(ScalarType.BOOLEAN);
+            markSlot.setIsMaterialized(true);
+            markSlot.setIsNullable(false);
+            markSlot.setColumn(new Column(slotName, ScalarType.BOOLEAN));
+            SlotRef markRef = new SlotRef(markSlot);
+            markRef.setTblName(new TableName(null, null, markTuple.getAlias()));
+            markRef.setLabel(slotName);
+            smap.put(exprWithSubquery, markRef);
+            markTuples.add(markTuple);
+            exprsWithSubqueries.add(exprWithSubquery);
+        } else {
+            BoolLiteral boolLiteral = new BoolLiteral(true);
+            boolLiteral.analyze(analyzer);
+            smap.put(exprWithSubquery, boolLiteral);
+            exprsWithSubqueries.add(exprWithSubquery);
+        }
+    }
+
 
     /**
      * Replace an ExistsPredicate that contains a subquery with a BoolLiteral if we
@@ -549,7 +613,7 @@ public class StmtRewriter {
      * @throws AnalysisException
      */
     private static boolean mergeExpr(SelectStmt stmt, Expr expr,
-                                     Analyzer analyzer) throws AnalysisException {
+            Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException {
         // LOG.warn("dhc mergeExpr stmt={} expr={}", stmt, expr);
         LOG.debug("SUBQUERY mergeExpr stmt={} expr={}", stmt.toSql(), expr.toSql());
         Preconditions.checkNotNull(expr);
@@ -564,7 +628,7 @@ public class StmtRewriter {
         // to eliminate any chance that column aliases from the parent query could reference
         // select items from the inline view after the equal.
         List<String> colLabels = Lists.newArrayList();
-        // add a new alias for all of columns in subquery
+        // add a new alias for all columns in subquery
         for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) {
             colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias());
         }
@@ -598,7 +662,7 @@ public class StmtRewriter {
         }
 
         /*
-         * Situation: The expr is a uncorrelated subquery for outer stmt.
+         * Situation: The expr is an uncorrelated subquery for outer stmt.
          * Rewrite: Add a limit 1 for subquery.
          * origin stmt: select * from t1 where exists (select * from table2);
          * expr: exists (select * from table2)
@@ -612,7 +676,7 @@ public class StmtRewriter {
         }
 
         // Analyzing the inline view trigger reanalysis of the subquery's select statement.
-        // However the statement is already analyzed and since statement analysis is not
+        // However, the statement is already analyzed and since statement analysis is not
         // idempotent, the analysis needs to be reset (by a call to clone()).
         // inlineView = (InlineViewRef) inlineView.clone();
         inlineView.reset();
@@ -627,8 +691,7 @@ public class StmtRewriter {
         JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN;
 
         // Create a join conjunct from the expr that contains a subquery.
-        Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer,
-                !onClauseConjuncts.isEmpty());
+        Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer, !onClauseConjuncts.isEmpty());
         if (joinConjunct != null) {
             SelectListItem firstItem =
                     ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
@@ -641,8 +704,16 @@ public class StmtRewriter {
                 // by the subquery due to some predicate. The new join conjunct is added to
                 // stmt's WHERE clause because it needs to be applied to the result of the
                 // LEFT OUTER JOIN (both matched and unmatched tuples).
-                stmt.whereClause =
-                        CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
+                if (markTuple != null) {
+                    // replace
+                    ExprSubstitutionMap smap = new ExprSubstitutionMap();
+                    smap.put(new SlotRef(markTuple.getSlots().get(0)), joinConjunct);
+                    stmt.whereClause.substitute(smap);
+                    markTuple = null;
+                } else {
+                    stmt.whereClause =
+                            CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
+                }
                 joinConjunct = null;
                 joinOp = JoinOperator.LEFT_OUTER_JOIN;
                 updateSelectList = true;
@@ -666,7 +737,9 @@ public class StmtRewriter {
             // subquery using a CROSS JOIN.
             // TODO This is very expensive. Remove it when we implement independent
             // subquery evaluation.
-            inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
+            joinOp = JoinOperator.CROSS_JOIN;
+            inlineView.setMark(markTuple);
+            inlineView.setJoinOp(joinOp);
             LOG.warn("uncorrelated subquery rewritten using a cross join");
             // Indicate that new visible tuples may be added in stmt's select list.
             return true;
@@ -732,10 +805,19 @@ public class StmtRewriter {
                 joinOp = JoinOperator.CROSS_JOIN;
                 // We can equal the aggregate subquery using a cross join. All conjuncts
                 // that were extracted from the subquery are added to stmt's WHERE clause.
-                stmt.whereClause =
-                        CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
+                if (markTuple != null) {
+                    // replace
+                    ExprSubstitutionMap markSmap = new ExprSubstitutionMap();
+                    markSmap.put(new SlotRef(markTuple.getSlots().get(0)), onClausePredicate);
+                    stmt.whereClause.substitute(markSmap);
+                    markTuple = null;
+                } else {
+                    stmt.whereClause =
+                            CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
+                }
             }
 
+            inlineView.setMark(markTuple);
             inlineView.setJoinOp(joinOp);
             if (joinOp != JoinOperator.CROSS_JOIN) {
                 inlineView.setOnClause(onClausePredicate);
@@ -752,7 +834,10 @@ public class StmtRewriter {
                 && ((ExistsPredicate) expr).isNotExists()) {
             // For the case of a NOT IN with an eq join conjunct, replace the join
             // conjunct with a conjunct that uses the null-matching eq operator.
-            if (expr instanceof InPredicate) {
+            // TODO: mark join only works on nested loop join now, and NLJ do NOT support NULL_AWARE_LEFT_ANTI_JOIN
+            //     remove markTuple == null when nested loop join support NULL_AWARE_LEFT_ANTI_JOIN
+            //     or plan mark join on hash join
+            if (expr instanceof InPredicate && markTuple == null) {
                 joinOp = VectorizedUtil.isVectorized()
                         ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : JoinOperator.LEFT_ANTI_JOIN;
                 List<TupleId> tIds = Lists.newArrayList();
@@ -777,6 +862,7 @@ public class StmtRewriter {
             }
         }
 
+        inlineView.setMark(markTuple);
         inlineView.setJoinOp(joinOp);
         inlineView.setOnClause(onClausePredicate);
         return updateSelectList;
@@ -850,8 +936,7 @@ public class StmtRewriter {
         ArrayList<Expr> correlatedPredicates = Lists.newArrayList();
 
         if (subqueryStmt.hasWhereClause()) {
-            if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(),
-                    subqueryTupleIds)) {
+            if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(), subqueryTupleIds)) {
                 throw new AnalysisException("Disjunctions with correlated predicates "
                         + "are not supported: " + subqueryStmt.getWhereClause().toSql());
             }
@@ -898,15 +983,13 @@ public class StmtRewriter {
      * replace them with true BoolLiterals. The modified expr tree is returned
      * and the extracted correlated predicates are added to 'matches'.
      */
-    private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds,
-                                                    ArrayList<Expr> matches) {
+    private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds, ArrayList<Expr> matches) {
         if (isCorrelatedPredicate(root, tupleIds)) {
             matches.add(root);
             return new BoolLiteral(true);
         }
         for (int i = 0; i < root.getChildren().size(); ++i) {
-            root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds,
-                    matches));
+            root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, matches));
         }
         return root;
     }
@@ -933,7 +1016,7 @@ public class StmtRewriter {
 
     /**
      * Checks if an expr containing a correlated subquery is eligible for equal by
-     * tranforming into a join. 'correlatedPredicates' contains the correlated
+     * transforming into a join. 'correlatedPredicates' contains the correlated
      * predicates identified in the subquery. Throws an AnalysisException if 'expr'
      * is not eligible for equal.
      * TODO: Merge all the equal eligibility tests into a single function.
@@ -953,26 +1036,19 @@ public class StmtRewriter {
             SelectListItem item = stmt.getSelectList().getItems().get(0);
             if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) {
                 throw new AnalysisException("The select item in correlated subquery of binary predicate should only "
-                        + "be sum, min, max, avg and count. Current subquery:"
-                        + stmt.toSql());
+                        + "be sum, min, max, avg and count. Current subquery:" + stmt.toSql());
             }
         }
         // Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in
         // predicate.
         if (expr instanceof InPredicate && (stmt.hasAggInfo() || stmt.hasAnalyticInfo())) {
             LOG.warn("canRewriteCorrelatedSubquery fail, expr={} subquery={}", expr.toSql(), stmt.toSql());
-            throw new AnalysisException("Unsupported correlated subquery with grouping "
-                    + "and/or aggregation: "
-                    + stmt.toSql());
+            throw new AnalysisException("Unsupported correlated subquery"
+                    + " with grouping and/or aggregation: " + stmt.toSql());
         }
 
         final com.google.common.base.Predicate<Expr> isSingleSlotRef =
-                new com.google.common.base.Predicate<Expr>() {
-                    @Override
-                    public boolean apply(Expr arg) {
-                        return arg.unwrapSlotRef(false) != null;
-                    }
-                };
+                arg -> arg.unwrapSlotRef(false) != null;
 
         // A HAVING clause is only allowed on correlated EXISTS subqueries with
         // correlated binary predicates of the form Slot = Slot (see IMPALA-2734)
@@ -1099,7 +1175,7 @@ public class StmtRewriter {
      * the aggregate function is wrapped into a 'zeroifnull' function.
      */
     private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView,
-                                           Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
+            Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
         Preconditions.checkNotNull(exprWithSubquery);
         Preconditions.checkNotNull(inlineView);
         Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
@@ -1134,8 +1210,8 @@ public class StmtRewriter {
                 ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
         if (isCorrelated && item.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
             // TODO: Add support for multiple agg functions that return non-null on an
-            // empty input, by wrapping them with zeroifnull functions before the inline
-            // view is analyzed.
+            //   empty input, by wrapping them with zeroifnull functions before the inline
+            //   view is analyzed.
             if (!Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr())
                     && (!(item.getExpr() instanceof CastExpr)
                     || !Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr().getChild(0)))) {
@@ -1149,12 +1225,12 @@ public class StmtRewriter {
             // TODO Generalize this by making the aggregate functions aware of the
             // literal expr that they return on empty input, e.g. max returns a
             // NullLiteral whereas count returns a NumericLiteral.
-            if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isNumericType()) {
+            if (aggFns.get(0).getFn().getReturnType().isNumericType()) {
                 FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull",
                         Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
                 zeroIfNull.analyze(analyzer);
                 subquerySubstitute = zeroIfNull;
-            } else if (((FunctionCallExpr) aggFns.get(0)).getFn().getReturnType().isStringType()) {
+            } else if (aggFns.get(0).getFn().getReturnType().isStringType()) {
                 List<Expr> params = Lists.newArrayList();
                 params.add(slotRef);
                 params.add(new StringLiteral(""));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 5e6dafc6ce..d960587b13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -91,6 +91,8 @@ public class TableRef implements ParseNode, Writable {
     // Indicates whether this table ref is given an explicit alias,
     protected boolean hasExplicitAlias;
     protected JoinOperator joinOp;
+    protected boolean isMark;
+    protected String markTupleName;
     protected List<String> usingColNames;
     protected ArrayList<LateralViewRef> lateralViewRefs;
     protected Expr onClause;
@@ -175,6 +177,8 @@ public class TableRef implements ParseNode, Writable {
         aliases = other.aliases;
         hasExplicitAlias = other.hasExplicitAlias;
         joinOp = other.joinOp;
+        isMark = other.isMark;
+        markTupleName = other.markTupleName;
         // NOTE: joinHints and sortHints maybe changed after clone. so we new one List.
         joinHints =
                 (other.joinHints != null) ? Lists.newArrayList(other.joinHints) : null;
@@ -261,6 +265,23 @@ public class TableRef implements ParseNode, Writable {
         this.joinOp = op;
     }
 
+    public boolean isMark() {
+        return isMark;
+    }
+
+    public String getMarkTupleName() {
+        return markTupleName;
+    }
+
+    public void setMark(TupleDescriptor markTuple) {
+        this.isMark = markTuple != null;
+        if (isMark) {
+            this.markTupleName = markTuple.getAlias();
+        } else {
+            this.markTupleName = null;
+        }
+    }
+
     public Expr getOnClause() {
         return onClause;
     }
@@ -651,7 +672,7 @@ public class TableRef implements ParseNode, Writable {
             case NULL_AWARE_LEFT_ANTI_JOIN:
                 return "NULL AWARE LEFT ANTI JOIN";
             default:
-                return "bad join op: " + joinOp.toString();
+                return "bad join op: " + joinOp;
         }
     }
 
@@ -792,6 +813,8 @@ public class TableRef implements ParseNode, Writable {
      */
     protected void setJoinAttrs(TableRef other) {
         this.joinOp = other.joinOp;
+        this.isMark = other.isMark;
+        this.markTupleName = other.markTupleName;
         this.joinHints = other.joinHints;
         // this.tableHints_ = other.tableHints_;
         this.onClause = other.onClause;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index d1e94c0f9f..963a4feda0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -110,6 +110,7 @@ public abstract class JoinNodeBase extends PlanNode {
     }
 
     protected void computeOutputTuple(Analyzer analyzer) throws UserException {
+        // TODO(mark join) if it is mark join use mark tuple instead?
         // 1. create new tuple
         vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
         boolean copyLeft = false;
@@ -338,13 +339,16 @@ public abstract class JoinNodeBase extends PlanNode {
 
     protected abstract void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap);
 
-    protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException {
+    protected void computeIntermediateTuple(Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException {
         // 1. create new tuple
         TupleDescriptor vIntermediateLeftTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
         TupleDescriptor vIntermediateRightTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
         vIntermediateTupleDescList = new ArrayList<>();
         vIntermediateTupleDescList.add(vIntermediateLeftTupleDesc);
         vIntermediateTupleDescList.add(vIntermediateRightTupleDesc);
+        if (markTuple != null) {
+            vIntermediateTupleDescList.add(markTuple);
+        }
         boolean leftNullable = false;
         boolean rightNullable = false;
 
@@ -446,7 +450,11 @@ public abstract class JoinNodeBase extends PlanNode {
     public void finalize(Analyzer analyzer) throws UserException {
         super.finalize(analyzer);
         if (VectorizedUtil.isVectorized()) {
-            computeIntermediateTuple(analyzer);
+            TupleDescriptor markTuple = null;
+            if (innerRef != null) {
+                markTuple = analyzer.getMarkTuple(innerRef);
+            }
+            computeIntermediateTuple(analyzer, markTuple);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 7a34084ec8..6adb64cd4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -176,6 +176,7 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         if (vJoinConjunct != null) {
             msg.nested_loop_join_node.setVjoinConjunct(vJoinConjunct.treeToThrift());
         }
+        msg.nested_loop_join_node.setIsMark(innerRef != null && innerRef.isMark());
         if (vSrcToOutputSMap != null) {
             for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
                 // TODO: Enable it after we support new optimizers
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 5e1973aaed..fd24403179 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -931,8 +931,7 @@ public class SingleNodePlanner {
      * subplan ref are materialized by a join node added during plan generation.
      */
     // (ML): change the function name
-    private PlanNode createJoinPlan(Analyzer analyzer,
-                                    TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
+    private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
             throws UserException {
         LOG.debug("Try to create a query plan starting with " + leftmostRef.getUniqueAlias());
 
@@ -2068,18 +2067,21 @@ public class SingleNodePlanner {
             ojConjuncts = analyzer.getUnassignedConjuncts(tupleIds, false);
         }
         analyzer.markConjunctsAssigned(ojConjuncts);
-        if (eqJoinConjuncts.isEmpty()) {
+        if (eqJoinConjuncts.isEmpty() || innerRef.isMark()) {
             NestedLoopJoinNode result =
                     new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner, innerRef);
-            result.setJoinConjuncts(ojConjuncts);
+            List<Expr> joinConjuncts = Lists.newArrayList(eqJoinConjuncts);
+            joinConjuncts.addAll(ojConjuncts);
+            result.setJoinConjuncts(joinConjuncts);
+            result.addConjuncts(analyzer.getMarkConjuncts(innerRef));
             result.init(analyzer);
             return result;
         }
 
-        HashJoinNode result =
-                new HashJoinNode(ctx.getNextNodeId(), outer, inner, innerRef, eqJoinConjuncts,
-                        ojConjuncts);
+        HashJoinNode result = new HashJoinNode(ctx.getNextNodeId(), outer, inner,
+                innerRef, eqJoinConjuncts, ojConjuncts);
         result.init(analyzer);
+        result.addConjuncts(analyzer.getMarkConjuncts(innerRef));
         return result;
     }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 7cadebd8f6..b14610587a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -623,6 +623,8 @@ struct TNestedLoopJoinNode {
   5: optional bool is_output_left_side_only
 
   6: optional Exprs.TExpr vjoin_conjunct
+
+  7: optional bool is_mark
 }
 
 struct TMergeJoinNode {
diff --git a/regression-test/data/correctness/test_subquery_in_disjunction.out b/regression-test/data/correctness/test_subquery_in_disjunction.out
new file mode 100644
index 0000000000..4259f027ea
--- /dev/null
+++ b/regression-test/data/correctness/test_subquery_in_disjunction.out
@@ -0,0 +1,26 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !in --
+1	2	3
+10	20	30
+
+-- !scalar --
+1	2	3
+100	200	300
+
+-- !exists_true --
+1	2	3
+10	20	30
+100	200	300
+
+-- !in_exists_false --
+1	2	3
+
+-- !not_in --
+1	2	3
+10	20	30
+100	200	300
+
+-- !not_in_covered --
+1	2	3
+100	200	300
+
diff --git a/regression-test/suites/correctness/test_subquery_in_disjunction.groovy b/regression-test/suites/correctness/test_subquery_in_disjunction.groovy
new file mode 100644
index 0000000000..70b280addd
--- /dev/null
+++ b/regression-test/suites/correctness/test_subquery_in_disjunction.groovy
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_subquery_in_disjunction") {
+    sql """ DROP TABLE IF EXISTS test_sq_dj1 """
+    sql """ DROP TABLE IF EXISTS test_sq_dj2 """
+    sql """
+    CREATE TABLE `test_sq_dj1` (
+        `c1` int(11) NULL,
+        `c2` int(11) NULL,
+        `c3` int(11) NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`c1`)
+    COMMENT 'OLAP'
+    DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "disable_auto_compaction" = "false"
+    );
+    """
+    sql """
+    CREATE TABLE `test_sq_dj2` (
+        `c1` int(11) NULL,
+        `c2` int(11) NULL,
+        `c3` int(11) NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`c1`)
+    COMMENT 'OLAP'
+    DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "disable_auto_compaction" = "false"
+    );
+    """
+    sql """
+        insert into test_sq_dj1 values(1, 2, 3), (10, 20, 30), (100, 200, 300)
+    """
+    sql """
+        insert into test_sq_dj2 values(10, 20, 30)
+    """
+
+    order_qt_in """
+        SELECT * FROM test_sq_dj1 WHERE c1 IN (SELECT c1 FROM test_sq_dj2) OR c1 < 10;
+    """
+
+    order_qt_scalar """
+        SELECT * FROM test_sq_dj1 WHERE c1 > (SELECT AVG(c1) FROM test_sq_dj2) OR c1 < 10;
+    """
+
+    order_qt_exists_true """
+        SELECT * FROM test_sq_dj1 WHERE EXISTS (SELECT c1 FROM test_sq_dj2 WHERE c1 = 10) OR c1 < 10;
+    """
+
+    order_qt_in_exists_false """
+        SELECT * FROM test_sq_dj1 WHERE EXISTS (SELECT c1 FROM test_sq_dj2 WHERE c1 > 10) OR c1 < 10;
+    """
+
+    order_qt_not_in """
+        SELECT * FROM test_sq_dj1 WHERE c1 NOT IN (SELECT c1 FROM test_sq_dj2) OR c1 = 10;
+    """
+
+    order_qt_not_in_covered """
+        SELECT * FROM test_sq_dj1 WHERE c1 NOT IN (SELECT c1 FROM test_sq_dj2) OR c1 = 100;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org