You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/04 01:01:22 UTC
[drill] 01/02: DRILL-6645: Transform TopN in Lateral Unnest
pipeline to Sort and Limit.
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 024fd8aa43e413bfa5a0f84f6c66b5402788b2bc
Author: Hanumath Rao Maduri <ha...@gmail.com>
AuthorDate: Wed Aug 1 15:41:29 2018 -0700
DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.
closes #1417
---
.../drill/exec/planner/physical/AggPrelBase.java | 2 +-
.../apache/drill/exec/planner/physical/FilterPrel.java | 2 +-
.../apache/drill/exec/planner/physical/LimitPrel.java | 2 +-
.../org/apache/drill/exec/planner/physical/Prel.java | 8 +++++++-
.../drill/exec/planner/physical/ProjectPrel.java | 2 +-
.../planner/physical/SelectionVectorRemoverPrel.java | 2 +-
.../apache/drill/exec/planner/physical/SortPrel.java | 2 +-
.../apache/drill/exec/planner/physical/TopNPrel.java | 18 ++++++++++++++++--
.../apache/drill/exec/planner/physical/UnnestPrel.java | 2 +-
.../physical/visitor/LateralUnnestRowIDVisitor.java | 6 +++---
10 files changed, 33 insertions(+), 13 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index a4f51f3..ca68a7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -189,7 +189,7 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
List<Integer> groupingCols = Lists.newArrayList();
groupingCols.add(0);
for (int groupingCol : groupSet.asList()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 1c9112c..33c2944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -85,7 +85,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
RexBuilder builder = this.getCluster().getRexBuilder();
// right shift the previous field indices.
return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 057cfae..ccbff17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -111,7 +111,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
index b72aff7..01d8e9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
@@ -56,7 +56,13 @@ public interface Prel extends DrillRelNode, Iterable<Prel> {
SelectionVectorMode getEncoding();
boolean needsFinalColumnReordering();
- default Prel addImplicitRowIDCol(List<RelNode> children) {
+ /**
+ * If the operator is in Lateral/Unnest pipeline, then it generates a new operator which knows how to process
+ * the rows accordingly during execution.
+ * eg: TopNPrel -> SortPrel and LimitPrel
+ * Other operators like FilterPrel, ProjectPrel etc will add an implicit row id to the output.
+ */
+ default Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
this.getClass().getSimpleName() + " operator ");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 0a9e8bf..4d5de20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -136,7 +136,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
RexBuilder builder = this.getCluster().getRexBuilder();
List<RexNode> projects = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index a4cd921..3fad017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -55,7 +55,7 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
return (Prel) this.copy(this.traitSet, children);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 686e04a..aa7158a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -124,7 +124,7 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
relFieldCollations.add(new RelFieldCollation(0,
RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 9bdcad0..3e407f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -18,11 +18,14 @@
package org.apache.drill.exec.planner.physical;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -101,7 +104,7 @@ public class TopNPrel extends SinglePrel {
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
relFieldCollations.add(new RelFieldCollation(0,
RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
@@ -115,6 +118,17 @@ public class TopNPrel extends SinglePrel {
.replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE))
.replace(collationTrait)
.replace(DRILL_PHYSICAL);
- return (Prel) this.copy(traits, children);
+ return transformTopNToSortAndLimit(children, traits, collationTrait);
+ }
+
+ private Prel transformTopNToSortAndLimit(List<RelNode> children, RelTraitSet traits, RelCollation collationTrait) {
+ SortPrel sortprel = new SortPrel(this.getCluster(), traits, children.get(0), collationTrait);
+ RexNode offset = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(0),
+ this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ RexNode limit = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(this.limit),
+ this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ //SMEX is not needed here because Lateral/Unnest pipeline doesn't support exchanges.
+ LimitPrel limitPrel = new LimitPrel(this.getCluster(), traits, sortprel, offset, limit, false, true);
+ return limitPrel;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
index 2331138..274f27a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -86,7 +86,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements Prel {
}
@Override
- public Prel addImplicitRowIDCol(List<RelNode> children) {
+ public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
List<String> fieldNames = new ArrayList<>();
List<RelDataType> fieldTypes = new ArrayList<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
index 4692202..dc4af5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
@@ -43,7 +43,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
public Prel visitPrel(Prel prel, Boolean isRightOfLateral) throws RuntimeException {
List<RelNode> children = getChildren(prel, isRightOfLateral);
if (isRightOfLateral) {
- return prel.addImplicitRowIDCol(children);
+ return prel.prepareForLateralUnnestPipeline(children);
} else {
return (Prel) prel.copy(prel.getTraitSet(), children);
}
@@ -61,7 +61,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
@Override
public Prel visitLateral(LateralJoinPrel prel, Boolean value) throws RuntimeException {
List<RelNode> children = Lists.newArrayList();
- children.add(((Prel)prel.getInput(0)).accept(this, false));
+ children.add(((Prel)prel.getInput(0)).accept(this, value));
children.add(((Prel) prel.getInput(1)).accept(this, true));
return (Prel) prel.copy(prel.getTraitSet(), children);
@@ -69,6 +69,6 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
@Override
public Prel visitUnnest(UnnestPrel prel, Boolean value) throws RuntimeException {
- return prel.addImplicitRowIDCol(null);
+ return prel.prepareForLateralUnnestPipeline(null);
}
}