You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2022/01/27 09:38:50 UTC
[drill] branch master updated: DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)
This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 493ac43 DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)
493ac43 is described below
commit 493ac43af92f165f31e6f6ca3182bd1f324823e3
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Jan 27 11:38:39 2022 +0200
DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)
* DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables
* DRILL-8114: Change after code review
---
.../iceberg/plan/IcebergPluginImplementor.java | 30 +++++++-------
.../exec/store/iceberg/IcebergQueriesTest.java | 48 ++++++++++++++++++++++
.../store/mongo/plan/MongoPluginImplementor.java | 40 +++++++++++-------
.../drill/exec/planner/common/DrillRelOptUtil.java | 3 ++
.../exec/store/plan/AbstractPluginImplementor.java | 16 ++++++++
5 files changed, 108 insertions(+), 29 deletions(-)
diff --git a/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java b/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
index 1ed9cba..3d85b54 100644
--- a/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
+++ b/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
@@ -22,7 +22,6 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Util;
@@ -30,12 +29,9 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillLimitRelBase;
-import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.SubsetRemover;
import org.apache.drill.exec.store.iceberg.IcebergGroupScan;
import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
@@ -97,9 +93,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
if (expression != null) {
try {
- TableScan tableScan = DrillRelOptUtil.findScan(filter.accept(SubsetRemover.INSTANCE));
- DrillTable drillTable = DrillRelOptUtil.getDrillTable(tableScan);
- GroupScan scan = drillTable.getGroupScan();
+ GroupScan scan = findGroupScan(filter);
if (scan instanceof IcebergGroupScan) {
IcebergGroupScan groupScan = (IcebergGroupScan) scan;
// ensures that expression compatible with table schema
@@ -107,7 +101,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
} else {
return false;
}
- } catch (ValidationException | IOException e) {
+ } catch (ValidationException e) {
return false;
}
}
@@ -125,11 +119,14 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
@Override
public boolean canImplement(DrillLimitRelBase limit) {
- FirstLimitFinder finder = new FirstLimitFinder();
- limit.getInput().accept(finder);
- int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
- int newLimit = getArtificialLimit(limit);
- return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+ if (hasPluginGroupScan(limit)) {
+ FirstLimitFinder finder = new FirstLimitFinder();
+ limit.getInput().accept(finder);
+ int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+ int newLimit = getArtificialLimit(limit);
+ return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+ }
+ return false;
}
@Override
@@ -144,7 +141,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
@Override
public boolean canImplement(Project project) {
- return true;
+ return hasPluginGroupScan(project);
}
@Override
@@ -152,6 +149,11 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
return groupScan;
}
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof IcebergGroupScan;
+ }
+
private int rexLiteralIntValue(RexLiteral offset) {
return ((BigDecimal) offset.getValue()).intValue();
}
diff --git a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
index 4e010f2..93c8ea5 100644
--- a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
+++ b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
@@ -685,4 +685,52 @@ public class IcebergQueriesTest extends ClusterTest {
.go();
}
+ @Test
+ public void testLateralSql() throws Exception {
+ String sql =
+ "SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
+ "FROM cp.`lateraljoin/nested-customer.json` t,\n" +
+ "unnest(t.orders) t2(ord)\n" +
+ "LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("c_name", "o_shop")
+ .baselineValues("customer1", "Meno Park 1st")
+ .go();
+ }
+
+ @Test
+ public void testLateralSqlIceberg() throws Exception {
+ String sql =
+ "SELECT t.int_field, t2.ord.struct_string_field struct_string_field\n" +
+ "FROM dfs.tmp.testAllTypes t,\n" +
+ "unnest(t.repeated_struct_field) t2(ord)\n" +
+ "ORDER BY t.int_field\n" +
+ "LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(sql)
+ .unOrdered()
+ .baselineColumns("int_field", "struct_string_field")
+ .baselineValues(1, "abc")
+ .go();
+ }
+
+ @Test
+ public void testFilterPushCorrelate() throws Exception {
+ String sql =
+ "SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
+ "FROM cp.`lateraljoin/nested-customer.json` t,\n" +
+ "unnest(t.orders) t2(ord)\n" +
+ "WHERE t.c_name='customer1' AND t2.ord.o_shop='Meno Park 1st'";
+
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(sql)
+ .baselineColumns("c_name", "o_shop")
+ .baselineValues("customer1", "Meno Park 1st")
+ .go();
+ }
}
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index 5fae342..64c9b4e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -231,7 +231,8 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
@Override
public boolean canImplement(Aggregate aggregate) {
- return aggregate.getGroupType() == Aggregate.Group.SIMPLE
+ return hasPluginGroupScan(aggregate)
+ && aggregate.getGroupType() == Aggregate.Group.SIMPLE
&& aggregate.getAggCallList().stream()
.noneMatch(AggregateCall::isDistinct)
&& aggregate.getAggCallList().stream()
@@ -240,41 +241,45 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
@Override
public boolean canImplement(Filter filter) {
- LogicalExpression conditionExp = DrillOptiq.toDrill(
- new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
- filter.getInput(),
- filter.getCondition());
- MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
- filterBuilder.parseTree();
- return filterBuilder.isAllExpressionsConverted();
+ if (hasPluginGroupScan(filter)) {
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ filter.getCondition());
+ MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
+ filterBuilder.parseTree();
+ return filterBuilder.isAllExpressionsConverted();
+ }
+ return false;
}
@Override
public boolean canImplement(DrillLimitRelBase limit) {
- return true;
+ return hasPluginGroupScan(limit);
}
@Override
public boolean canImplement(Project project) {
- return project.getProjects().stream()
- .allMatch(RexToMongoTranslator::supportsExpression);
+ return hasPluginGroupScan(project) &&
+ project.getProjects().stream()
+ .allMatch(RexToMongoTranslator::supportsExpression);
}
@Override
public boolean canImplement(Sort sort) {
- return true;
+ return hasPluginGroupScan(sort);
}
@Override
public boolean canImplement(Union union) {
// allow converting for union all only, since Drill adds extra aggregation for union distinct,
// so we will convert both union all and aggregation later
- return union.all;
+ return union.all && hasPluginGroupScan(union);
}
@Override
public boolean canImplement(TableScan scan) {
- return true;
+ return hasPluginGroupScan(scan);
}
@Override
@@ -292,7 +297,12 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
newSpec, columns, runAggregate);
}
- public static int direction(RelFieldCollation fieldCollation) {
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof MongoGroupScan;
+ }
+
+ private static int direction(RelFieldCollation fieldCollation) {
switch (fieldCollation.getDirection()) {
case DESCENDING:
case STRICTLY_DESCENDING:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index ec9bf4e..d642154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -325,6 +325,9 @@ public abstract class DrillRelOptUtil {
for (RelNode rel : rels) {
if (rel instanceof TableScan) {
return (TableScan) rel;
+ } else if (rel instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) rel;
+ return findScan(Util.first(relSubset.getBest(), relSubset.getOriginal()));
} else {
return findScan(rel.getInputs().toArray(new RelNode[0]));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
index 6b49be9..ce3a1e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.plan;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
@@ -25,7 +26,11 @@ import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Union;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.function.CheckedFunction;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
@@ -38,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Optional;
/**
* Abstract base implementation of {@link PluginImplementor} that can be used by
@@ -141,4 +147,14 @@ public abstract class AbstractPluginImplementor implements PluginImplementor {
.message("Plugin implementor doesn't support push down for %s", rel)
.build(logger);
}
+
+ protected GroupScan findGroupScan(RelNode node) {
+ CheckedFunction<DrillTable, GroupScan, IOException> groupScanFunction = DrillTable::getGroupScan;
+ return Optional.ofNullable(DrillRelOptUtil.findScan(node))
+ .map(DrillRelOptUtil::getDrillTable)
+ .map(groupScanFunction)
+ .orElse(null);
+ }
+
+ protected abstract boolean hasPluginGroupScan(RelNode node);
}