You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by lu...@apache.org on 2022/01/27 09:38:50 UTC

[drill] branch master updated: DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)

This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 493ac43  DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)
493ac43 is described below

commit 493ac43af92f165f31e6f6ca3182bd1f324823e3
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Jan 27 11:38:39 2022 +0200

    DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables (#2436)
    
    * DRILL-8114: Prevent applying Iceberg project on non-Iceberg tables
    
    * DRILL-8114: Change after code review
---
 .../iceberg/plan/IcebergPluginImplementor.java     | 30 +++++++-------
 .../exec/store/iceberg/IcebergQueriesTest.java     | 48 ++++++++++++++++++++++
 .../store/mongo/plan/MongoPluginImplementor.java   | 40 +++++++++++-------
 .../drill/exec/planner/common/DrillRelOptUtil.java |  3 ++
 .../exec/store/plan/AbstractPluginImplementor.java | 16 ++++++++
 5 files changed, 108 insertions(+), 29 deletions(-)

diff --git a/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java b/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
index 1ed9cba..3d85b54 100644
--- a/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
+++ b/contrib/format-iceberg/src/main/java/org/apache/drill/exec/store/iceberg/plan/IcebergPluginImplementor.java
@@ -22,7 +22,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Util;
@@ -30,12 +29,9 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
-import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
-import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.SubsetRemover;
 import org.apache.drill.exec.store.iceberg.IcebergGroupScan;
 import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
@@ -97,9 +93,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
 
     if (expression != null) {
       try {
-        TableScan tableScan = DrillRelOptUtil.findScan(filter.accept(SubsetRemover.INSTANCE));
-        DrillTable drillTable = DrillRelOptUtil.getDrillTable(tableScan);
-        GroupScan scan = drillTable.getGroupScan();
+        GroupScan scan = findGroupScan(filter);
         if (scan instanceof IcebergGroupScan) {
           IcebergGroupScan groupScan = (IcebergGroupScan) scan;
           // ensures that expression compatible with table schema
@@ -107,7 +101,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
         } else {
           return false;
         }
-      } catch (ValidationException | IOException e) {
+      } catch (ValidationException e) {
         return false;
       }
     }
@@ -125,11 +119,14 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
 
   @Override
   public boolean canImplement(DrillLimitRelBase limit) {
-    FirstLimitFinder finder = new FirstLimitFinder();
-    limit.getInput().accept(finder);
-    int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
-    int newLimit = getArtificialLimit(limit);
-    return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+    if (hasPluginGroupScan(limit)) {
+      FirstLimitFinder finder = new FirstLimitFinder();
+      limit.getInput().accept(finder);
+      int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+      int newLimit = getArtificialLimit(limit);
+      return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+    }
+    return false;
   }
 
   @Override
@@ -144,7 +141,7 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
 
   @Override
   public boolean canImplement(Project project) {
-    return true;
+    return hasPluginGroupScan(project);
   }
 
   @Override
@@ -152,6 +149,11 @@ public class IcebergPluginImplementor extends AbstractPluginImplementor {
     return groupScan;
   }
 
+  @Override
+  protected boolean hasPluginGroupScan(RelNode node) {
+    return findGroupScan(node) instanceof IcebergGroupScan;
+  }
+
   private int rexLiteralIntValue(RexLiteral offset) {
     return ((BigDecimal) offset.getValue()).intValue();
   }
diff --git a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
index 4e010f2..93c8ea5 100644
--- a/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
+++ b/contrib/format-iceberg/src/test/java/org/apache/drill/exec/store/iceberg/IcebergQueriesTest.java
@@ -685,4 +685,52 @@ public class IcebergQueriesTest extends ClusterTest {
       .go();
   }
 
+  @Test
+  public void testLateralSql() throws Exception {
+    String sql =
+      "SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
+        "FROM cp.`lateraljoin/nested-customer.json` t,\n" +
+        "unnest(t.orders) t2(ord)\n" +
+        "LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("c_name", "o_shop")
+      .baselineValues("customer1", "Meno Park 1st")
+      .go();
+  }
+
+  @Test
+  public void testLateralSqlIceberg() throws Exception {
+    String sql =
+      "SELECT t.int_field, t2.ord.struct_string_field struct_string_field\n" +
+        "FROM dfs.tmp.testAllTypes t,\n" +
+        "unnest(t.repeated_struct_field) t2(ord)\n" +
+        "ORDER BY t.int_field\n" +
+        "LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("int_field", "struct_string_field")
+      .baselineValues(1, "abc")
+      .go();
+  }
+
+  @Test
+  public void testFilterPushCorrelate() throws Exception {
+    String sql =
+      "SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
+        "FROM cp.`lateraljoin/nested-customer.json` t,\n" +
+        "unnest(t.orders) t2(ord)\n" +
+        "WHERE t.c_name='customer1' AND t2.ord.o_shop='Meno Park 1st'";
+
+    testBuilder()
+      .unOrdered()
+      .sqlQuery(sql)
+      .baselineColumns("c_name", "o_shop")
+      .baselineValues("customer1", "Meno Park 1st")
+      .go();
+  }
 }
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index 5fae342..64c9b4e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -231,7 +231,8 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
 
   @Override
   public boolean canImplement(Aggregate aggregate) {
-    return aggregate.getGroupType() == Aggregate.Group.SIMPLE
+    return hasPluginGroupScan(aggregate)
+      && aggregate.getGroupType() == Aggregate.Group.SIMPLE
       && aggregate.getAggCallList().stream()
       .noneMatch(AggregateCall::isDistinct)
       && aggregate.getAggCallList().stream()
@@ -240,41 +241,45 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
 
   @Override
   public boolean canImplement(Filter filter) {
-    LogicalExpression conditionExp = DrillOptiq.toDrill(
-      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
-      filter.getInput(),
-      filter.getCondition());
-    MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
-    filterBuilder.parseTree();
-    return filterBuilder.isAllExpressionsConverted();
+    if (hasPluginGroupScan(filter)) {
+      LogicalExpression conditionExp = DrillOptiq.toDrill(
+        new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+        filter.getInput(),
+        filter.getCondition());
+      MongoFilterBuilder filterBuilder = new MongoFilterBuilder(conditionExp);
+      filterBuilder.parseTree();
+      return filterBuilder.isAllExpressionsConverted();
+    }
+    return false;
   }
 
   @Override
   public boolean canImplement(DrillLimitRelBase limit) {
-    return true;
+    return hasPluginGroupScan(limit);
   }
 
   @Override
   public boolean canImplement(Project project) {
-    return project.getProjects().stream()
-      .allMatch(RexToMongoTranslator::supportsExpression);
+    return hasPluginGroupScan(project) &&
+      project.getProjects().stream()
+        .allMatch(RexToMongoTranslator::supportsExpression);
   }
 
   @Override
   public boolean canImplement(Sort sort) {
-    return true;
+    return hasPluginGroupScan(sort);
   }
 
   @Override
   public boolean canImplement(Union union) {
     // allow converting for union all only, since Drill adds extra aggregation for union distinct,
     // so we will convert both union all and aggregation later
-    return union.all;
+    return union.all && hasPluginGroupScan(union);
   }
 
   @Override
   public boolean canImplement(TableScan scan) {
-    return true;
+    return hasPluginGroupScan(scan);
   }
 
   @Override
@@ -292,7 +297,12 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
       newSpec, columns, runAggregate);
   }
 
-  public static int direction(RelFieldCollation fieldCollation) {
+  @Override
+  protected boolean hasPluginGroupScan(RelNode node) {
+    return findGroupScan(node) instanceof MongoGroupScan;
+  }
+
+  private static int direction(RelFieldCollation fieldCollation) {
     switch (fieldCollation.getDirection()) {
       case DESCENDING:
       case STRICTLY_DESCENDING:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index ec9bf4e..d642154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -325,6 +325,9 @@ public abstract class DrillRelOptUtil {
     for (RelNode rel : rels) {
       if (rel instanceof TableScan) {
         return (TableScan) rel;
+      } else if (rel instanceof RelSubset) {
+        RelSubset relSubset = (RelSubset) rel;
+        return findScan(Util.first(relSubset.getBest(), relSubset.getOriginal()));
       } else {
         return findScan(rel.getInputs().toArray(new RelNode[0]));
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
index 6b49be9..ce3a1e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.plan;
 
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
@@ -25,7 +26,11 @@ import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.util.function.CheckedFunction;
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.plan.rel.PluginAggregateRel;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
 import org.apache.drill.exec.store.plan.rel.PluginJoinRel;
@@ -38,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Abstract base implementation of {@link PluginImplementor} that can be used by
@@ -141,4 +147,14 @@ public abstract class AbstractPluginImplementor implements PluginImplementor {
         .message("Plugin implementor doesn't support push down for %s", rel)
         .build(logger);
   }
+
+  protected GroupScan findGroupScan(RelNode node) {
+    CheckedFunction<DrillTable, GroupScan, IOException> groupScanFunction = DrillTable::getGroupScan;
+    return Optional.ofNullable(DrillRelOptUtil.findScan(node))
+      .map(DrillRelOptUtil::getDrillTable)
+      .map(groupScanFunction)
+      .orElse(null);
+  }
+
+  protected abstract boolean hasPluginGroupScan(RelNode node);
 }