You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/04 01:01:22 UTC

[drill] 01/02: DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 024fd8aa43e413bfa5a0f84f6c66b5402788b2bc
Author: Hanumath Rao Maduri <ha...@gmail.com>
AuthorDate: Wed Aug 1 15:41:29 2018 -0700

    DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.
    
    closes #1417
---
 .../drill/exec/planner/physical/AggPrelBase.java       |  2 +-
 .../apache/drill/exec/planner/physical/FilterPrel.java |  2 +-
 .../apache/drill/exec/planner/physical/LimitPrel.java  |  2 +-
 .../org/apache/drill/exec/planner/physical/Prel.java   |  8 +++++++-
 .../drill/exec/planner/physical/ProjectPrel.java       |  2 +-
 .../planner/physical/SelectionVectorRemoverPrel.java   |  2 +-
 .../apache/drill/exec/planner/physical/SortPrel.java   |  2 +-
 .../apache/drill/exec/planner/physical/TopNPrel.java   | 18 ++++++++++++++++--
 .../apache/drill/exec/planner/physical/UnnestPrel.java |  2 +-
 .../physical/visitor/LateralUnnestRowIDVisitor.java    |  6 +++---
 10 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index a4f51f3..ca68a7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -189,7 +189,7 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<Integer> groupingCols = Lists.newArrayList();
     groupingCols.add(0);
     for (int groupingCol : groupSet.asList()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 1c9112c..33c2944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -85,7 +85,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RexBuilder builder = this.getCluster().getRexBuilder();
     // right shift the previous field indices.
     return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 057cfae..ccbff17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -111,7 +111,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
index b72aff7..01d8e9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
@@ -56,7 +56,13 @@ public interface Prel extends DrillRelNode, Iterable<Prel> {
   SelectionVectorMode getEncoding();
   boolean needsFinalColumnReordering();
 
-  default Prel addImplicitRowIDCol(List<RelNode> children) {
+  /**
+   * If the operator is in Lateral/Unnest pipeline, then it generates a new operator which knows how to process
+   * the rows accordingly during execution.
+   * eg: TopNPrel -> SortPrel and LimitPrel
+   * Other operators like FilterPrel, ProjectPrel etc will add an implicit row id to the output.
+   */
+  default Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
             this.getClass().getSimpleName() + " operator ");
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 0a9e8bf..4d5de20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -136,7 +136,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
     RexBuilder builder = this.getCluster().getRexBuilder();
     List<RexNode> projects = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index a4cd921..3fad017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -55,7 +55,7 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     return (Prel) this.copy(this.traitSet, children);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 686e04a..aa7158a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -124,7 +124,7 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
     relFieldCollations.add(new RelFieldCollation(0,
                             RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 9bdcad0..3e407f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -18,11 +18,14 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -101,7 +104,7 @@ public class TopNPrel extends SinglePrel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
     relFieldCollations.add(new RelFieldCollation(0,
                           RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
@@ -115,6 +118,17 @@ public class TopNPrel extends SinglePrel {
                                     .replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE))
                                     .replace(collationTrait)
                                     .replace(DRILL_PHYSICAL);
-    return (Prel) this.copy(traits, children);
+    return transformTopNToSortAndLimit(children, traits, collationTrait);
+  }
+
+  private Prel transformTopNToSortAndLimit(List<RelNode> children, RelTraitSet traits, RelCollation collationTrait) {
+    SortPrel sortprel = new SortPrel(this.getCluster(), traits, children.get(0), collationTrait);
+    RexNode offset = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(0),
+            this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+    RexNode limit = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(this.limit),
+            this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+    //SMEX is not needed here because Lateral/Unnest pipeline doesn't support exchanges.
+    LimitPrel limitPrel = new LimitPrel(this.getCluster(), traits, sortprel, offset, limit, false, true);
+    return limitPrel;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
index 2331138..274f27a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -86,7 +86,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
     List<String> fieldNames = new ArrayList<>();
     List<RelDataType> fieldTypes = new ArrayList<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
index 4692202..dc4af5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
@@ -43,7 +43,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
   public Prel visitPrel(Prel prel, Boolean isRightOfLateral) throws RuntimeException {
     List<RelNode> children = getChildren(prel, isRightOfLateral);
     if (isRightOfLateral) {
-      return prel.addImplicitRowIDCol(children);
+      return prel.prepareForLateralUnnestPipeline(children);
     } else {
       return (Prel) prel.copy(prel.getTraitSet(), children);
     }
@@ -61,7 +61,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
   @Override
   public Prel visitLateral(LateralJoinPrel prel, Boolean value) throws RuntimeException {
     List<RelNode> children = Lists.newArrayList();
-    children.add(((Prel)prel.getInput(0)).accept(this, false));
+    children.add(((Prel)prel.getInput(0)).accept(this, value));
     children.add(((Prel) prel.getInput(1)).accept(this, true));
 
     return (Prel) prel.copy(prel.getTraitSet(), children);
@@ -69,6 +69,6 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
 
   @Override
   public Prel visitUnnest(UnnestPrel prel, Boolean value) throws RuntimeException {
-    return prel.addImplicitRowIDCol(null);
+    return prel.prepareForLateralUnnestPipeline(null);
   }
 }