You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by hu...@apache.org on 2022/07/29 04:44:13 UTC

[doris] branch master updated: [feature](Nereids) Add subquery analyze (#11300)

This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a60cfab844 [feature](Nereids) Add subquery analyze (#11300)
a60cfab844 is described below

commit a60cfab844cd4608c406c1df55cb6bee175dfa60
Author: zhengshiJ <32...@users.noreply.github.com>
AuthorDate: Fri Jul 29 12:44:04 2022 +0800

    [feature](Nereids) Add subquery analyze (#11300)
    
    Increase the parsing of subquery.
    
    Add LogicalApply and LogicalCorrelatedJoin and LogicalEnforceSingleRow.
    (These structures are temporarily in use, in preparation for the follow-up)
    
    LogicalApply:
    Apply Node for subquery.
    Use this node to display the subquery in the relational algebra tree.
    refer to "Orthogonal Optimization of Subqueries and Aggregation"
    
    LogicalCorrelatedJoin:
    A relational algebra node with join type converted from apply node to subquery.
    
    LogicalEnforceSingleRow:
    Guaranteed to return a result of 1 row.
---
 .../doris/nereids/analyzer/NereidsAnalyzer.java    |  23 +++-
 .../doris/nereids/jobs/batch/AnalyzeRulesJob.java  |  10 +-
 .../nereids/rules/analysis/BindSlotReference.java  |  44 +++++--
 .../apache/doris/nereids/rules/analysis/Scope.java |  65 ++++++++++
 .../visitor/DefaultSubExprRewriter.java            |  60 +++++++++
 .../apache/doris/nereids/trees/plans/PlanType.java |   3 +
 .../nereids/trees/plans/logical/LogicalApply.java  | 117 ++++++++++++++++++
 .../trees/plans/logical/LogicalCorrelatedJoin.java | 135 +++++++++++++++++++++
 .../plans/logical/LogicalEnforceSingleRow.java     |  98 +++++++++++++++
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  15 +++
 .../nereids/trees/expressions/SubqueryTest.java    |  52 ++++++--
 11 files changed, 596 insertions(+), 26 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/NereidsAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/NereidsAnalyzer.java
index 16daf6ec40..a21306e147 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/NereidsAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/NereidsAnalyzer.java
@@ -21,10 +21,13 @@ import org.apache.doris.nereids.PlannerContext;
 import org.apache.doris.nereids.jobs.batch.AnalyzeRulesJob;
 import org.apache.doris.nereids.memo.Memo;
 import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.rules.analysis.Scope;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.qe.ConnectContext;
 
+import java.util.Optional;
+
 /**
  * Bind symbols according to metadata in the catalog, perform semantic analysis, etc.
  * TODO: revisit the interface after subquery analysis is supported.
@@ -40,14 +43,21 @@ public class NereidsAnalyzer {
      * Analyze plan.
      */
     public LogicalPlan analyze(Plan plan) {
-        return (LogicalPlan) analyzeWithPlannerContext(plan).getMemo().copyOut();
+        return analyze(plan, Optional.empty());
+    }
+
+    /**
+     * Analyze plan with scope.
+     */
+    public LogicalPlan analyze(Plan plan, Optional<Scope> scope) {
+        return (LogicalPlan) analyzeWithPlannerContext(plan, scope).getMemo().copyOut();
     }
 
     /**
      * Convert SQL String to analyzed plan.
      */
     public LogicalPlan analyze(String sql) {
-        return analyze(parse(sql));
+        return analyze(parse(sql), Optional.empty());
     }
 
     /**
@@ -56,11 +66,18 @@ public class NereidsAnalyzer {
      * further plan optimization without creating new {@link Memo} and {@link PlannerContext}.
      */
     public PlannerContext analyzeWithPlannerContext(Plan plan) {
+        return analyzeWithPlannerContext(plan, Optional.empty());
+    }
+
+    /**
+     * Analyze plan with scope.
+     */
+    public PlannerContext analyzeWithPlannerContext(Plan plan, Optional<Scope> scope) {
         PlannerContext plannerContext = new Memo(plan)
                 .newPlannerContext(connectContext)
                 .setDefaultJobContext();
 
-        new AnalyzeRulesJob(plannerContext).execute();
+        new AnalyzeRulesJob(plannerContext, scope).execute();
         return plannerContext;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/AnalyzeRulesJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/AnalyzeRulesJob.java
index d8273b03c0..0421e66d7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/AnalyzeRulesJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/AnalyzeRulesJob.java
@@ -22,24 +22,28 @@ import org.apache.doris.nereids.rules.analysis.BindFunction;
 import org.apache.doris.nereids.rules.analysis.BindRelation;
 import org.apache.doris.nereids.rules.analysis.BindSlotReference;
 import org.apache.doris.nereids.rules.analysis.ProjectToGlobalAggregate;
+import org.apache.doris.nereids.rules.analysis.Scope;
 
 import com.google.common.collect.ImmutableList;
 
+import java.util.Optional;
+
 /**
  * Execute the analysis rules.
  */
 public class AnalyzeRulesJob extends BatchRulesJob {
 
     /**
-     * Execute the analysis job
+     * Execute the analysis job with scope.
      * @param plannerContext planner context for execute job
+     * @param scope Parse the symbolic scope of the field
      */
-    public AnalyzeRulesJob(PlannerContext plannerContext) {
+    public AnalyzeRulesJob(PlannerContext plannerContext, Optional<Scope> scope) {
         super(plannerContext);
         rulesJob.addAll(ImmutableList.of(
                 bottomUpBatch(ImmutableList.of(
                         new BindRelation(),
-                        new BindSlotReference(),
+                        new BindSlotReference(scope),
                         new BindFunction(),
                         new ProjectToGlobalAggregate())
                 )));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java
index c76fb4b6a9..2a2ab691d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSlotReference.java
@@ -28,7 +28,7 @@ import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
+import org.apache.doris.nereids.trees.expressions.visitor.DefaultSubExprRewriter;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -48,6 +48,20 @@ import java.util.stream.Stream;
  * BindSlotReference.
  */
 public class BindSlotReference implements AnalysisRuleFactory {
+    private final Optional<Scope> outerScope;
+
+    public BindSlotReference(Optional<Scope> outputScope) {
+        this.outerScope = outputScope;
+    }
+
+    private Scope toScope(List<Slot> slots) {
+        if (outerScope.isPresent()) {
+            return new Scope(outerScope, slots);
+        } else {
+            return new Scope(slots);
+        }
+    }
+
     @Override
     public List<Rule> buildRules() {
         return ImmutableList.of(
@@ -115,15 +129,14 @@ public class BindSlotReference implements AnalysisRuleFactory {
         List<Slot> boundedSlots = inputs.stream()
                 .flatMap(input -> input.getOutput().stream())
                 .collect(Collectors.toList());
-        return (E) new SlotBinder(boundedSlots, plan).bind(expr);
+        return (E) new SlotBinder(toScope(boundedSlots), plan).bind(expr);
     }
 
-    private class SlotBinder extends DefaultExpressionRewriter<Void> {
-        private final List<Slot> boundSlots;
+    private class SlotBinder extends DefaultSubExprRewriter<Void> {
         private final Plan plan;
 
-        public SlotBinder(List<Slot> boundSlots, Plan plan) {
-            this.boundSlots = boundSlots;
+        public SlotBinder(Scope scope, Plan plan) {
+            super(scope);
             this.plan = plan;
         }
 
@@ -144,15 +157,22 @@ public class BindSlotReference implements AnalysisRuleFactory {
 
         @Override
         public Slot visitUnboundSlot(UnboundSlot unboundSlot, Void context) {
-            List<Slot> bounded = bindSlot(unboundSlot, boundSlots);
+            Optional<List<Slot>> boundedOpt = getScope()
+                    .toScopeLink() // Scope Link from inner scope to outer scope
+                    .stream()
+                    .map(scope -> bindSlot(unboundSlot, scope.getSlots()))
+                    .filter(slots -> !slots.isEmpty())
+                    .findFirst();
+            if (!boundedOpt.isPresent()) {
+                throw new AnalysisException("Cannot resolve " + unboundSlot.toString());
+            }
+            List<Slot> bounded = boundedOpt.get();
             switch (bounded.size()) {
-                case 0:
-                    throw new AnalysisException("Cannot resolve " + unboundSlot.toString());
                 case 1:
                     return bounded.get(0);
                 default:
                     throw new AnalysisException(unboundSlot + " is ambiguous: "
-                        + bounded.stream()
+                            + bounded.stream()
                             .map(Slot::toString)
                             .collect(Collectors.joining(", ")));
             }
@@ -166,7 +186,7 @@ public class BindSlotReference implements AnalysisRuleFactory {
             List<String> qualifier = unboundStar.getQualifier();
             switch (qualifier.size()) {
                 case 0: // select *
-                    return new BoundStar(boundSlots);
+                    return new BoundStar(getScope().getSlots());
                 case 1: // select table.*
                 case 2: // select db.table.*
                     return bindQualifiedStar(qualifier, context);
@@ -179,7 +199,7 @@ public class BindSlotReference implements AnalysisRuleFactory {
         private BoundStar bindQualifiedStar(List<String> qualifierStar, Void context) {
             // FIXME: compatible with previous behavior:
             // https://github.com/apache/doris/pull/10415/files/3fe9cb0c3f805ab3a9678033b281b16ad93ec60a#r910239452
-            List<Slot> slots = boundSlots.stream().filter(boundSlot -> {
+            List<Slot> slots = getScope().getSlots().stream().filter(boundSlot -> {
                 switch (qualifierStar.size()) {
                     // table.*
                     case 1:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/Scope.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/Scope.java
new file mode 100644
index 0000000000..a9df300964
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/Scope.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.nereids.trees.expressions.Slot;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * The slot range required for expression analyze.
+ */
+public class Scope {
+    private final Optional<Scope> outerScope;
+    private final List<Slot> slots;
+
+    public Scope(Optional<Scope> outerScope, List<Slot> slots) {
+        this.outerScope = outerScope;
+        this.slots = slots;
+    }
+
+    public Scope(List<Slot> slots) {
+        this.outerScope = Optional.empty();
+        this.slots = slots;
+    }
+
+    public List<Slot> getSlots() {
+        return slots;
+    }
+
+    public Optional<Scope> getOuterScope() {
+        return outerScope;
+    }
+
+    /**
+     * generate scope link from inner to outer.
+     */
+    public List<Scope> toScopeLink() {
+        Scope scope = this;
+        Builder<Scope> builder = ImmutableList.<Scope>builder().add(scope);
+        while (scope.getOuterScope().isPresent()) {
+            scope = scope.getOuterScope().get();
+            builder.add(scope);
+        }
+        return builder.build();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/DefaultSubExprRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/DefaultSubExprRewriter.java
new file mode 100644
index 0000000000..0f5c706662
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/DefaultSubExprRewriter.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.visitor;
+
+import org.apache.doris.nereids.analyzer.NereidsAnalyzer;
+import org.apache.doris.nereids.rules.analysis.Scope;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Optional;
+
+/**
+ * Use the visitor to iterate sub expression.
+ */
+public class DefaultSubExprRewriter<C> extends DefaultExpressionRewriter<C> {
+    private final Scope scope;
+
+    public DefaultSubExprRewriter(Scope scope) {
+        this.scope = scope;
+    }
+
+    @Override
+    public Expression visitSubqueryExpr(SubqueryExpr expr, C context) {
+        return new SubqueryExpr(analyzeSubquery(expr));
+    }
+
+    @Override
+    public Expression visitInSubquery(InSubquery expr, C context) {
+        return new InSubquery(expr.getCompareExpr(), analyzeSubquery(expr));
+    }
+
+    private LogicalPlan analyzeSubquery(SubqueryExpr expr) {
+        NereidsAnalyzer subAnalyzer = new NereidsAnalyzer(ConnectContext.get());
+        LogicalPlan analyzed = subAnalyzer.analyze(
+                expr.getQueryPlan(), Optional.ofNullable(scope));
+        return analyzed;
+    }
+
+    public Scope getScope() {
+        return scope;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index a7feb3d3dc..5a35e9b21f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -32,6 +32,9 @@ public enum PlanType {
     LOGICAL_AGGREGATE,
     LOGICAL_SORT,
     LOGICAL_OLAP_SCAN,
+    LOGICAL_APPLY,
+    LOGICAL_CORRELATED_JOIN,
+    LOGICAL_ENFORCE_SINGLE_ROW,
     GROUP_PLAN,
 
     // physical plan
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalApply.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalApply.java
new file mode 100644
index 0000000000..2323c46e6f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalApply.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Apply Node for subquery.
+ * Use this node to display the subquery in the relational algebra tree.
+ * refer to "Orthogonal Optimization of Subqueries and Aggregation"
+ *
+ * @param <LEFT_CHILD_TYPE> input
+ * @param <RIGHT_CHILD_TYPE> subquery
+ */
+public class LogicalApply<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE extends Plan>
+        extends LogicalBinary<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
+    // correlation : subqueries and outer associated columns
+    private final List<Expression> correlation;
+
+    public LogicalApply(LEFT_CHILD_TYPE input, RIGHT_CHILD_TYPE subquery, List<Expression> correlation) {
+        this(Optional.empty(), Optional.empty(), input, subquery, correlation);
+    }
+
+    public LogicalApply(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
+            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild, List<Expression> correlation) {
+        super(PlanType.LOGICAL_APPLY, groupExpression, logicalProperties, leftChild, rightChild);
+        this.correlation = ImmutableList.copyOf(correlation);
+    }
+
+    public List<Expression> getCorrelation() {
+        return correlation;
+    }
+
+    @Override
+    public List<Slot> computeOutput(Plan left, Plan right) {
+        return ImmutableList.<Slot>builder()
+                .addAll(left.getOutput())
+                .addAll(right.getOutput())
+                .build();
+    }
+
+    @Override
+    public String toString() {
+        return "LogicalApply (" + this.child(1).toString() + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LogicalApply that = (LogicalApply) o;
+        return Objects.equals(correlation, that.getCorrelation());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(correlation);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalApply((LogicalApply<Plan, Plan>) this, context);
+    }
+
+    @Override
+    public List<Expression> getExpressions() {
+        return correlation;
+    }
+
+    @Override
+    public LogicalBinary<Plan, Plan> withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 2);
+        return new LogicalApply<>(children.get(0), children.get(1), correlation);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new LogicalApply<>(groupExpression, Optional.of(logicalProperties), left(), right(), correlation);
+    }
+
+    @Override
+    public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
+        return new LogicalApply<>(Optional.empty(), logicalProperties, left(), right(), correlation);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCorrelatedJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCorrelatedJoin.java
new file mode 100644
index 0000000000..45ae86e440
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCorrelatedJoin.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A relational algebra node with join type converted from apply node to subquery.
+ * @param <LEFT_CHILD_TYPE> input.
+ * @param <RIGHT_CHILD_TYPE> subquery.
+ */
+public class LogicalCorrelatedJoin<LEFT_CHILD_TYPE extends Plan, RIGHT_CHILD_TYPE extends Plan>
+        extends LogicalBinary<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> {
+
+    private final List<Expression> correlation;
+    private final Optional<Expression> filter;
+
+    public LogicalCorrelatedJoin(LEFT_CHILD_TYPE input, RIGHT_CHILD_TYPE subquery, List<Expression> correlation,
+            Optional<Expression> filter) {
+        this(Optional.empty(), Optional.empty(), input, subquery,
+                correlation, filter);
+    }
+
+    public LogicalCorrelatedJoin(Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties,
+            LEFT_CHILD_TYPE leftChild, RIGHT_CHILD_TYPE rightChild, List<Expression> correlation,
+            Optional<Expression> filter) {
+        super(PlanType.LOGICAL_CORRELATED_JOIN, groupExpression, logicalProperties, leftChild, rightChild);
+        this.correlation = ImmutableList.copyOf(correlation);
+        this.filter = Objects.requireNonNull(filter, "filter can not be null");
+    }
+
+    public List<Expression> getCorrelation() {
+        return correlation;
+    }
+
+    public Optional<Expression> getFilter() {
+        return filter;
+    }
+
+    @Override
+    public List<Slot> computeOutput(Plan left, Plan right) {
+        return ImmutableList.<Slot>builder()
+                .addAll(left.getOutput())
+                .addAll(right.getOutput())
+                .build();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("LogicalCorrelated ((correlation: ").append(type);
+        correlation.stream().map(c -> sb.append(", ").append(c));
+        sb.append("), (filter: ");
+        filter.ifPresent(expression -> sb.append(", ").append(expression));
+        return sb.append("))").toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LogicalCorrelatedJoin that = (LogicalCorrelatedJoin) o;
+        return Objects.equals(correlation, that.getCorrelation())
+                && Objects.equals(filter, that.getFilter());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(correlation, filter);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalCorrelated((LogicalCorrelatedJoin<Plan, Plan>) this, context);
+    }
+
+    @Override
+    public List<Expression> getExpressions() {
+        return new ImmutableList.Builder<Expression>()
+                .addAll(correlation)
+                .add(filter.get())
+                .build();
+    }
+
+    @Override
+    public LogicalBinary<Plan, Plan> withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 2);
+        return new LogicalCorrelatedJoin<>(children.get(0), children.get(1),
+                correlation, filter);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new LogicalCorrelatedJoin<>(groupExpression, Optional.of(logicalProperties),
+                left(), right(), correlation, filter);
+    }
+
+    @Override
+    public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
+        return new LogicalCorrelatedJoin<>(Optional.empty(), logicalProperties,
+                left(), right(), correlation, filter);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalEnforceSingleRow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalEnforceSingleRow.java
new file mode 100644
index 0000000000..58024c3b94
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalEnforceSingleRow.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Guaranteed to return a result of 1 row.
+ * eg: select * from t1 where t1.a = (select sum(t2.b) from t2 where t1.b = t2.a);
+ * filter()
+ *   +--enforceSingleRow()
+ *     +--apply()
+ * @param <CHILD_TYPE> plan
+ */
+public class LogicalEnforceSingleRow<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> {
+
+    LogicalEnforceSingleRow(CHILD_TYPE plan) {
+        this(Optional.empty(), Optional.empty(), plan);
+    }
+
+    LogicalEnforceSingleRow(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
+            CHILD_TYPE plan) {
+        super(PlanType.LOGICAL_ENFORCE_SINGLE_ROW, groupExpression, logicalProperties, plan);
+    }
+
+    @Override
+    public List<Slot> computeOutput(Plan child) {
+        return ImmutableList.<Slot>builder()
+                .addAll(child.getOutput())
+                .build();
+    }
+
+    @Override
+    public String toString() {
+        return "LogicalEnforceSingleRow (" + this.child(0).toString() + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalEnforceSingleRow((LogicalEnforceSingleRow<Plan>) this, context);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1);
+        return new LogicalEnforceSingleRow<>(children.get(0));
+    }
+
+    @Override
+    public List<Expression> getExpressions() {
+        return ImmutableList.of();
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new LogicalEnforceSingleRow<>(groupExpression, Optional.of(logicalProperties), child());
+    }
+
+    @Override
+    public Plan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
+        return new LogicalEnforceSingleRow<>(Optional.empty(), logicalProperties, child());
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index 7ad5f43bee..ee4af751a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -21,6 +21,9 @@ import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCorrelatedJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalEnforceSingleRow;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -87,6 +90,18 @@ public abstract class PlanVisitor<R, C> {
         return visit(groupPlan, context);
     }
 
+    public R visitLogicalApply(LogicalApply<Plan, Plan> apply, C context) {
+        return visit(apply, context);
+    }
+
+    public R visitLogicalCorrelated(LogicalCorrelatedJoin<Plan, Plan> correlatedJoin, C context) {
+        return visit(correlatedJoin, context);
+    }
+
+    public R visitLogicalEnforceSingleRow(LogicalEnforceSingleRow<Plan> enforceSingleRow, C context) {
+        return visit(enforceSingleRow, context);
+    }
+
     // *******************************
     // Physical plans
     // *******************************
diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/SubqueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/SubqueryTest.java
index 5a8f071684..663aea728e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/SubqueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/expressions/SubqueryTest.java
@@ -17,9 +17,7 @@
 
 package org.apache.doris.nereids.trees.expressions;
 
-import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.tpch.AnalyzeCheckTestBase;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 
 import org.junit.jupiter.api.Test;
 
@@ -43,20 +41,58 @@ public class SubqueryTest extends AnalyzeCheckTestBase {
                 + "v2 int)\n"
                 + "distributed by hash(k1) buckets 1\n"
                 + "properties('replication_num' = '1');";
-        createTables(t0, t1);
+
+        String t2 = "create table t2("
+                + "id int, \n"
+                + "k1 int, \n"
+                + "v2 int)\n"
+                + "distributed by hash(k1) buckets 1\n"
+                + "properties('replication_num' = '1');";
+        createTables(t0, t1, t2);
     }
 
     @Test
-    public void test() {
+    public void inTest() {
         String sql = "select t0.k1\n"
                 + "from t0\n"
                 + "where t0.k2 in\n"
                 + "    (select id\n"
                 + "     from t1\n"
                 + "     where t0.k2=t1.k1)";
-        NereidsParser parser = new NereidsParser();
-        LogicalPlan parsed = parser.parseSingle(sql);
-        assert parsed != null;
-        //checkAnalyze(sql);
+        checkAnalyze(sql);
+    }
+
+    @Test
+    public void existTest() {
+        String sql1 = "select * from t0 where exists (select * from t1 where t0.k1 = t1.k1);";
+        checkAnalyze(sql1);
+    }
+
+    @Test
+    public void existAndExistTest() {
+        String sql1 = "select * from t0 where exists (select * from t1 where t0.k1 = t1.k1) "
+                + "and not exists (select * from t2 where t0.id != t2.id);";
+        checkAnalyze(sql1);
+
+        // This type of sql cannot be parsed and cannot recognize t1.id.
+        // Other systems also cannot resolve such as presto.
+        String sql2 = "select * from t0 where exists (select * from t1 where t0.k1 = t1.k1) "
+                + "and not exists (select * from t2 where t1.id != t2.id);";
+        assert sql2 != null;
+    }
+
+    @Test
+    public void scalarTest() {
+        String sql = "select * from t0 where t0.id = "
+                + "(select min(t1.id) from t1 where t0.k1 = t1.k1)";
+        checkAnalyze(sql);
+    }
+
+    @Test
+    public void inScalarTest() {
+        String sql = "select * from t0 where t0.id in "
+                + "(select * from t1 where t1.k1 = "
+                + "(select * from t2 where t0.id = t2.id));";
+        checkAnalyze(sql);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org