You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/08/27 08:25:35 UTC

[drill] 04/06: DRILL-6703: Query with complex expressions in lateral and unnest fails with CannotPlanException

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a1f3f9a7ee06d14e1932374e928853480e86dd90
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Aug 22 13:02:33 2018 +0300

    DRILL-6703: Query with complex expressions in lateral and unnest fails with CannotPlanException
    
    git closes #1441
---
 .../apache/drill/exec/planner/PlannerPhase.java    |   3 -
 ...rojectComplexRexNodeCorrelateTransposeRule.java | 154 ----------------
 .../planner/sql/handlers/ComplexUnnestVisitor.java | 199 +++++++++++++++++++++
 .../planner/sql/handlers/DefaultSqlHandler.java    |  33 ++--
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  39 ++++
 5 files changed, 252 insertions(+), 176 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 2d02011..168ff9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.DrillJoinRule;
 import org.apache.drill.exec.planner.logical.DrillLimitRule;
 import org.apache.drill.exec.planner.logical.DrillMergeProjectRule;
-import org.apache.drill.exec.planner.logical.ProjectComplexRexNodeCorrelateTransposeRule;
 import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule;
 import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule;
 import org.apache.drill.exec.planner.logical.DrillProjectRule;
@@ -312,8 +311,6 @@ public enum PlannerPhase {
       RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE,
       DrillPushProjectIntoScanRule.INSTANCE,
 
-      ProjectComplexRexNodeCorrelateTransposeRule.INSTANCE,
-
       /*
        Convert from Calcite Logical to Drill Logical Rules.
        */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ProjectComplexRexNodeCorrelateTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ProjectComplexRexNodeCorrelateTransposeRule.java
deleted file mode 100644
index a979d5b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ProjectComplexRexNodeCorrelateTransposeRule.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Uncollect;
-import org.apache.calcite.rel.logical.LogicalCorrelate;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.trace.CalciteTrace;
-import org.apache.drill.common.exceptions.UserException;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Rule that moves non-{@link RexFieldAccess} rex node from project below {@link Uncollect}
- * to the left side of the {@link Correlate}.
- */
-public class ProjectComplexRexNodeCorrelateTransposeRule extends RelOptRule {
-
-  public static final RelOptRule INSTANCE = new ProjectComplexRexNodeCorrelateTransposeRule();
-
-  public ProjectComplexRexNodeCorrelateTransposeRule() {
-    super(operand(LogicalCorrelate.class,
-        operand(RelNode.class, any()),
-        operand(Uncollect.class, operand(LogicalProject.class, any()))),
-        DrillRelFactories.LOGICAL_BUILDER,
-        "ProjectComplexRexNodeCorrelateTransposeRule");
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final Correlate correlate = call.rel(0);
-    final Uncollect uncollect = call.rel(2);
-    final LogicalProject project = call.rel(3);
-
-    // uncollect requires project with single expression
-    RexNode projectedNode = project.getProjects().iterator().next();
-
-    // check that the expression is complex call
-    if (!(projectedNode instanceof RexFieldAccess)) {
-      RelBuilder builder = call.builder();
-      RexBuilder rexBuilder = builder.getRexBuilder();
-
-      builder.push(correlate.getLeft());
-
-      // creates project with complex expr on top of the left side
-      List<RexNode> leftProjExprs = new ArrayList<>();
-
-      String complexFieldName = correlate.getRowType().getFieldNames()
-            .get(correlate.getRowType().getFieldNames().size() - 1);
-
-      List<String> fieldNames = new ArrayList<>();
-      for (RelDataTypeField field : correlate.getLeft().getRowType().getFieldList()) {
-        leftProjExprs.add(rexBuilder.makeInputRef(correlate.getLeft(), field.getIndex()));
-        fieldNames.add(field.getName());
-      }
-      fieldNames.add(complexFieldName);
-      List<RexNode> topProjectExpressions = new ArrayList<>(leftProjExprs);
-
-      // adds complex expression with replaced correlation
-      // to the projected list from the left
-      leftProjExprs.add(projectedNode.accept(new RexFieldAccessReplacer(builder)));
-
-      RelNode leftProject = builder.project(leftProjExprs, fieldNames)
-          .build();
-
-      CorrelationId correlationId = correlate.getCluster().createCorrel();
-      RexCorrelVariable rexCorrel =
-          (RexCorrelVariable) rexBuilder.makeCorrel(
-              leftProject.getRowType(),
-              correlationId);
-      builder.push(project.getInput());
-      RelNode rightProject = builder.project(
-              ImmutableList.of(rexBuilder.makeFieldAccess(rexCorrel, leftProjExprs.size() - 1)),
-              ImmutableList.of(complexFieldName))
-          .build();
-
-      int requiredColumnsCount = correlate.getRequiredColumns().cardinality();
-      if (requiredColumnsCount != 1) {
-        throw UserException.planError()
-            .message("Required columns count for Correlate operator " +
-                "differs from the expected value:\n" +
-                "Expected columns count is %s, but actual is %s",
-                1, requiredColumnsCount)
-            .build(CalciteTrace.getPlannerTracer());
-      }
-
-      RelNode newUncollect = uncollect.copy(uncollect.getTraitSet(), rightProject);
-      Correlate newCorrelate = correlate.copy(uncollect.getTraitSet(), leftProject, newUncollect,
-          correlationId, ImmutableBitSet.of(leftProjExprs.size() - 1), correlate.getJoinType());
-      builder.push(newCorrelate);
-
-      switch(correlate.getJoinType()) {
-        case LEFT:
-        case INNER:
-          // adds field from the right input of correlate to the top project
-          topProjectExpressions.add(
-              rexBuilder.makeInputRef(newCorrelate, topProjectExpressions.size() + 1));
-          // fall through
-        case ANTI:
-        case SEMI:
-          builder.project(topProjectExpressions, correlate.getRowType().getFieldNames());
-      }
-
-      call.transformTo(builder.build());
-    }
-  }
-
-  /**
-   * Visitor for RexNode which replaces {@link RexFieldAccess}
-   * with a reference to the field used in {@link RexFieldAccess}.
-   */
-  private static class RexFieldAccessReplacer extends RexShuttle {
-    private final RelBuilder builder;
-
-    public RexFieldAccessReplacer(RelBuilder builder) {
-      this.builder = builder;
-    }
-
-    @Override
-    public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
-      return builder.field(fieldAccess.getField().getName());
-    }
-  }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ComplexUnnestVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ComplexUnnestVisitor.java
new file mode 100644
index 0000000..0134377
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ComplexUnnestVisitor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Visitor that moves non-{@link RexFieldAccess} rex node from project below {@link Uncollect}
+ * to the left side of the {@link Correlate}.
+ */
+public class ComplexUnnestVisitor extends RelShuttleImpl {
+  private static final String COMPLEX_FIELD_NAME = "$COMPLEX_FIELD_NAME";
+
+  private final Map<CorrelationId, RelNode> leftInputs = new HashMap<>();
+  private final Map<CorrelationId, CorrelationId> updatedCorrelationIds = new HashMap<>();
+
+  private ComplexUnnestVisitor() {
+  }
+
+  @Override
+  public RelNode visit(LogicalCorrelate correlate) {
+    RelNode left = correlate.getLeft().accept(this);
+    leftInputs.put(correlate.getCorrelationId(), left);
+
+    RelNode right = correlate.getRight().accept(this);
+    // if right input wasn't changed or left input wasn't changed
+    // after rewriting right input, no need to create Correlate with new CorrelationId
+    if (correlate.getRight() == right
+        || left == leftInputs.get(correlate.getCorrelationId())) {
+      if (correlate.getLeft() == left) {
+        return correlate;
+      }
+      // changed only inputs, but CorrelationId left the same
+      return correlate.copy(correlate.getTraitSet(), Arrays.asList(left, right));
+    }
+
+    Correlate newCorrelate = correlate.copy(correlate.getTraitSet(),
+        leftInputs.get(correlate.getCorrelationId()), right,
+        updatedCorrelationIds.get(correlate.getCorrelationId()),
+        ImmutableBitSet.of(left.getRowType().getFieldCount()), correlate.getJoinType());
+
+    RelBuilder builder = DrillRelFactories.LOGICAL_BUILDER.create(correlate.getCluster(), null);
+    builder.push(newCorrelate);
+
+    List<RexNode> topProjectExpressions = left.getRowType().getFieldList().stream()
+        .map(field -> builder.getRexBuilder().makeInputRef(left, field.getIndex()))
+        .collect(Collectors.toList());
+
+    switch (correlate.getJoinType()) {
+      case LEFT:
+      case INNER:
+        // adds field from the right input of correlate to the top project
+        topProjectExpressions.add(
+            builder.getRexBuilder().makeInputRef(newCorrelate, topProjectExpressions.size() + 1));
+        // fall through
+      case ANTI:
+      case SEMI:
+        builder.project(topProjectExpressions, correlate.getRowType().getFieldNames());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public RelNode visit(RelNode other) {
+    if (other instanceof Uncollect) {
+      return visit((Uncollect) other);
+    }
+    return super.visit(other);
+  }
+
+  public RelNode visit(Uncollect uncollect) {
+    RelBuilder builder = DrillRelFactories.LOGICAL_BUILDER.create(uncollect.getCluster(), null);
+    RexBuilder rexBuilder = builder.getRexBuilder();
+
+    assert uncollect.getInput() instanceof Project : "Uncollect should have Project input";
+
+    Project project = (Project) uncollect.getInput();
+    // If project below uncollect contains only field references, no need to rewrite it
+    List<RexNode> projectChildExps = project.getChildExps();
+    assert projectChildExps.size() == 1 : "Uncollect does not support multiple expressions";
+
+    RexNode projectExpr = projectChildExps.iterator().next();
+    if (projectExpr.getKind() == SqlKind.FIELD_ACCESS) {
+      return uncollect;
+    }
+
+    // Collects CorrelationId instances used in current rel node
+    RelOptUtil.VariableUsedVisitor variableUsedVisitor = new RelOptUtil.VariableUsedVisitor(null);
+    project.accept(variableUsedVisitor);
+
+    assert variableUsedVisitor.variables.size() == 1 : "Uncollect supports only single correlated reference";
+
+    CorrelationId oldCorrId = variableUsedVisitor.variables.iterator().next();
+    RelNode left = leftInputs.get(oldCorrId);
+
+    // Creates new project to be placed on top of the left input of correlate
+    List<RexNode> leftProjExprs = new ArrayList<>();
+
+    List<String> fieldNames = new ArrayList<>();
+    for (RelDataTypeField field : left.getRowType().getFieldList()) {
+      leftProjExprs.add(rexBuilder.makeInputRef(left, field.getIndex()));
+      fieldNames.add(field.getName());
+    }
+    fieldNames.add(COMPLEX_FIELD_NAME);
+
+    builder.push(left);
+
+    // Adds complex expression with replaced correlation
+    // to the projected list from the left
+    leftProjExprs.add(new RexFieldAccessReplacer(builder).apply(projectExpr));
+
+    RelNode leftProject =
+        builder.project(leftProjExprs, fieldNames)
+            .build();
+    leftInputs.put(oldCorrId, leftProject);
+
+    builder.push(project.getInput());
+
+    CorrelationId newCorrId = uncollect.getCluster().createCorrel();
+    // stores new CorrelationId to be used during the creation of new Correlate
+    updatedCorrelationIds.put(oldCorrId, newCorrId);
+
+    RexNode rexCorrel = rexBuilder.makeCorrel(leftProject.getRowType(), newCorrId);
+
+    // constructs Project below Uncollect with updated RexCorrelVariable
+    builder.project(
+        ImmutableList.of(rexBuilder.makeFieldAccess(rexCorrel, leftProjExprs.size() - 1)),
+        ImmutableList.of(COMPLEX_FIELD_NAME));
+    return uncollect.copy(uncollect.getTraitSet(), builder.build());
+  }
+
+  /**
+   * Rewrites rel node tree and moves non-{@link RexFieldAccess} rex node from the project
+   * below {@link Uncollect} to the left side of the {@link Correlate}.
+   *
+   * @param relNode tree to be rewritten
+   * @return rewritten rel node tree
+   */
+  public static RelNode rewriteUnnestWithComplexExprs(RelNode relNode) {
+    ComplexUnnestVisitor visitor = new ComplexUnnestVisitor();
+    return relNode.accept(visitor);
+  }
+
+  /**
+   * Visitor for RexNode which replaces {@link RexFieldAccess}
+   * with a reference to the field used in {@link RexFieldAccess}.
+   */
+  private static class RexFieldAccessReplacer extends RexShuttle {
+    private final RelBuilder builder;
+
+    public RexFieldAccessReplacer(RelBuilder builder) {
+      this.builder = builder;
+    }
+
+    @Override
+    public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+      return builder.field(fieldAccess.getField().getName());
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 0d9793a..5ba5640 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -29,8 +29,6 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.apache.calcite.plan.RelOptCostImpl;
-import org.apache.calcite.plan.RelOptLattice;
-import org.apache.calcite.plan.RelOptMaterialization;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
@@ -111,7 +109,6 @@ import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 import org.apache.drill.exec.work.foreman.UnsupportedRelOperatorException;
 import org.slf4j.Logger;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
@@ -164,7 +161,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     }
   }
 
-  protected void log(final String name, final PhysicalPlan plan, final Logger logger) throws JsonProcessingException {
+  protected void log(final String name, final PhysicalPlan plan, final Logger logger) {
     if (logger.isDebugEnabled()) {
       PropertyFilter filter = new SimpleBeanPropertyFilter.SerializeExceptFilter(Sets.newHashSet("password"));
       String planText = plan.unparse(context.getLpPersistence().getMapper()
@@ -193,9 +190,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
    * Rewrite the parse tree. Used before validating the parse tree. Useful if a particular statement needs to converted
    * into another statement.
    *
-   * @param node
+   * @param node sql parse tree to be rewritten
    * @return Rewritten sql parse tree
-   * @throws RelConversionException
    */
   protected SqlNode rewrite(SqlNode node) throws RelConversionException, ForemanSetupException {
     return node;
@@ -217,10 +213,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
    *
    * @param relNode relational node
    * @return Drill Logical RelNode tree
-   * @throws SqlUnsupportedException
-   * @throws RelConversionException
+   * @throws SqlUnsupportedException if query cannot be planned
    */
-  protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
+  protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException {
     if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
         context.getPlannerSettings().isTypeInferenceEnabled() &&
         FindLimit0Visitor.containsLimit0(relNode)) {
@@ -299,7 +294,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     } catch (RelOptPlanner.CannotPlanException ex) {
       logger.error(ex.getMessage());
 
-      if(JoinUtils.checkCartesianJoin(relNode, new ArrayList<Integer>(), new ArrayList<Integer>(), new ArrayList<Boolean>())) {
+      if (JoinUtils.checkCartesianJoin(relNode, new ArrayList<>(), new ArrayList<>(), new ArrayList<>())) {
         throw new UnsupportedRelOperatorException("This query cannot be planned possibly due to either a cartesian join or an inequality join");
       } else {
         throw ex;
@@ -313,10 +308,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
    *
    * @param relNode root RelNode corresponds to Calcite Logical RelNode.
    * @return Drill Logical RelNode tree
-   * @throws RelConversionException
-   * @throws SqlUnsupportedException
+   * @throws SqlUnsupportedException if query cannot be planned
    */
-  protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException, SqlUnsupportedException {
+  protected DrillRel convertToDrel(RelNode relNode) throws SqlUnsupportedException {
     final DrillRel convertedRelNode = convertToRawDrel(relNode);
 
     return new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(),
@@ -429,7 +423,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
           "Cluster is expected to be constructed using VolcanoPlanner. Was actually of type %s.", planner.getClass()
               .getName());
       output = program.run(planner, input, toTraits,
-          ImmutableList.<RelOptMaterialization>of(), ImmutableList.<RelOptLattice>of());
+          ImmutableList.of(), ImmutableList.of());
 
       break;
     }
@@ -465,7 +459,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     } catch (RelOptPlanner.CannotPlanException ex) {
       logger.error(ex.getMessage());
 
-      if(JoinUtils.checkCartesianJoin(drel, new ArrayList<Integer>(), new ArrayList<Integer>(), new ArrayList<Boolean>())) {
+      if (JoinUtils.checkCartesianJoin(drel, new ArrayList<>(), new ArrayList<>(), new ArrayList<>())) {
         throw new UnsupportedRelOperatorException("This query cannot be planned possibly due to either a cartesian join or an inequality join");
       } else {
         throw ex;
@@ -488,7 +482,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       } catch (RelOptPlanner.CannotPlanException ex) {
         logger.error(ex.getMessage());
 
-        if(JoinUtils.checkCartesianJoin(drel, new ArrayList<Integer>(), new ArrayList<Integer>(), new ArrayList<Boolean>())) {
+        if (JoinUtils.checkCartesianJoin(drel, new ArrayList<>(), new ArrayList<>(), new ArrayList<>())) {
           throw new UnsupportedRelOperatorException("This query cannot be planned possibly due to either a cartesian join or an inequality join");
         } else {
           throw ex;
@@ -526,8 +520,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * We want to have smaller dataset on the right side, since hash table builds on right side.
      */
     if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
-      phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode, Double.valueOf(context.getPlannerSettings()
-          .getHashJoinSwapMarginFactor()));
+      phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode,
+          context.getPlannerSettings().getHashJoinSwapMarginFactor());
     }
 
     /* Parquet row group filter pushdown in planning time */
@@ -710,7 +704,8 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       throw ex;
     }
 
-    return rel;
+    // moves complex expressions below Uncollect to the right side of Correlate
+    return ComplexUnnestVisitor.rewriteUnnestWithComplexExprs(rel);
   }
 
   protected DrillRel addRenamedProject(DrillRel rel, RelDataType validatedRowType) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 0283ade..6578d0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -656,4 +656,43 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
         .go();
   }
 
+  @Test
+  public void testLateralWithComplexProject() throws Exception {
+    String sql = "select l.name from cp.`lateraljoin/nested-customer.parquet` c,\n" +
+        "lateral (select u.item.i_name as name from unnest(c.orders[0].items) as u(item)) l limit 1";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("name")
+        .baselineValues("paper towel")
+        .go();
+  }
+
+  @Test
+  public void testLateralWithAgg() throws Exception {
+    String sql = "select l.name from cp.`lateraljoin/nested-customer.parquet` c,\n" +
+        "lateral (select max(u.item.i_name) as name from unnest(c.orders[0].items) as u(item)) l limit 1";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("name")
+        .baselineValues("paper towel")
+        .go();
+  }
+
+  @Test
+  public void testMultiLateralWithComplexProject() throws Exception {
+    String sql = "select l1.name, l2.name as name2 from cp.`lateraljoin/nested-customer.parquet` c,\n" +
+      "lateral (select u.item.i_name as name from unnest(c.orders[0].items) as u(item)) l1," +
+      "lateral (select u.item.i_name as name from unnest(c.orders[0].items) as u(item)) l2 limit 1";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("name", "name2")
+      .baselineValues("paper towel", "paper towel")
+      .go();
+  }
 }