You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/07/23 11:45:13 UTC

[drill] 01/02: DRILL-6574: Add option to push LIMIT(0) on top of SCAN (late limit 0 optimization)

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit aa6a898a596ce9a8e2677ebc4987947bffc56dcb
Author: Sudheesh Katkam <sk...@maprtech.com>
AuthorDate: Mon Mar 21 15:32:47 2016 -0700

    DRILL-6574: Add option to push LIMIT(0) on top of SCAN (late limit 0 optimization)
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   3 +
 .../planner/sql/handlers/DefaultSqlHandler.java    |   3 +
 .../planner/sql/handlers/FindLimit0Visitor.java    | 119 +++++++++++++++++++++
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../java-exec/src/main/resources/drill-module.conf |   3 +-
 .../java/org/apache/drill/TestPartitionFilter.java |   4 +-
 .../impl/limit/TestLateLimit0Optimization.java     | 112 +++++++++++++++++++
 7 files changed, 243 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d0842d2..282ad30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -453,6 +453,9 @@ public final class ExecConstants {
   public static final String EARLY_LIMIT0_OPT_KEY = "planner.enable_limit0_optimization";
   public static final BooleanValidator EARLY_LIMIT0_OPT = new BooleanValidator(EARLY_LIMIT0_OPT_KEY);
 
+  public static final String LATE_LIMIT0_OPT_KEY = "planner.enable_limit0_on_scan";
+  public static final BooleanValidator LATE_LIMIT0_OPT = new BooleanValidator(LATE_LIMIT0_OPT_KEY);
+
   public static final String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation";
   public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 83e1a8f..cc2ec60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -287,6 +287,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
         if (FindLimit0Visitor.containsLimit0(convertedRelNodeWithSum0) &&
             FindHardDistributionScans.canForceSingleMode(convertedRelNodeWithSum0)) {
           context.getPlannerSettings().forceSingleMode();
+          if (context.getOptions().getOption(ExecConstants.LATE_LIMIT0_OPT)) {
+            return FindLimit0Visitor.addLimitOnTopOfLeafNodes(drillRel);
+          }
         }
 
         return drillRel;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 8031609..3746d7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -22,14 +22,23 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMinus;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.type.RelDataTypeField;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos;
@@ -42,6 +51,8 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
 import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -49,6 +60,14 @@ import org.apache.drill.exec.store.direct.DirectGroupScan;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.util.Pointer;
+
+import java.math.BigDecimal;
+import java.util.Set;
 
 /**
  * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
@@ -128,6 +147,100 @@ public class FindLimit0Visitor extends RelShuttleImpl {
 
   private boolean contains = false;
 
+  private static final Set<String> unsupportedFunctions = ImmutableSet.<String>builder()
+      // see Mappify
+      .add("KVGEN")
+      .add("MAPPIFY")
+      // see DummyFlatten
+      .add("FLATTEN")
+      // see JsonConvertFrom
+      .add("CONVERT_FROMJSON")
+      // see JsonConvertTo class
+      .add("CONVERT_TOJSON")
+      .add("CONVERT_TOSIMPLEJSON")
+      .add("CONVERT_TOEXTENDEDJSON")
+      .build();
+
+  private static boolean isUnsupportedScalarFunction(final SqlOperator operator) {
+    return operator instanceof DrillSqlOperator &&
+        unsupportedFunctions.contains(operator.getName().toUpperCase());
+  }
+
+  /**
+   * TODO(DRILL-3993): Use RelBuilder to create a limit node to allow for applying this optimization in potentially
+   * any of the transformations, but currently this can be applied after Drill logical transformation, and before
+   * Drill physical transformation.
+   */
+  public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) {
+    final Pointer<Boolean> isUnsupported = new Pointer<>(false);
+
+    // to visit unsupported functions
+    final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() {
+      @Override
+      public RexNode visitCall(RexCall call) {
+        final SqlOperator operator = call.getOperator();
+        if (isUnsupportedScalarFunction(operator)) {
+          isUnsupported.value = true;
+          return call;
+        }
+        return super.visitCall(call);
+      }
+    };
+
+    // to visit unsupported operators
+    final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
+      @Override
+      public RelNode visit(RelNode other) {
+        if (other instanceof DrillUnionRelBase) {
+          isUnsupported.value = true;
+          return other;
+        } else if (other instanceof DrillProjectRelBase) {
+          other.accept(unsupportedFunctionsVisitor);
+          if (isUnsupported.value) {
+            return other;
+          }
+        }
+        return super.visit(other);
+      }
+    };
+
+    rel.accept(unsupportedOperationsVisitor);
+    if (isUnsupported.value) {
+      return rel;
+    }
+
+    // to add LIMIT (0) on top of leaf nodes
+    final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() {
+
+      private RelNode addLimitAsParent(RelNode node) {
+        final RexBuilder builder = node.getCluster().getRexBuilder();
+        final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO);
+        final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO);
+        return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, offset, fetch);
+      }
+
+      @Override
+      public RelNode visit(LogicalValues values) {
+        return addLimitAsParent(values);
+      }
+
+      @Override
+      public RelNode visit(TableScan scan) {
+        return addLimitAsParent(scan);
+      }
+
+      @Override
+      public RelNode visit(RelNode other) {
+        if (other.getInputs().size() == 0) { // leaf operator
+          return addLimitAsParent(other);
+        }
+        return super.visit(other);
+      }
+    };
+
+    return (DrillRel) rel.accept(addLimitOnScanVisitor);
+  }
+
   private FindLimit0Visitor() {
   }
 
@@ -147,6 +260,11 @@ public class FindLimit0Visitor extends RelShuttleImpl {
 
   @Override
   public RelNode visit(RelNode other) {
+    if (other instanceof DrillJoinRelBase ||
+        other instanceof DrillAggregateRelBase ||
+        other instanceof DrillUnionRelBase) {
+      return other;
+    }
     if (other instanceof DrillLimitRel) {
       if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
         contains = true;
@@ -157,6 +275,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
     return super.visit(other);
   }
 
+  // TODO: The following nodes are never visited because this visitor is used after logical transformation!
   // The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning.
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5ee3825..a627821 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -189,6 +189,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
       new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
+      new OptionDefinition(ExecConstants.LATE_LIMIT0_OPT),
       new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION),
       new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE),
       new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 19e779d..16a285b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -508,7 +508,8 @@ drill.exec.options: {
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
     planner.enable_join_optimization: true,
-    planner.enable_limit0_optimization: false,
+    planner.enable_limit0_optimization: true,
+    planner.enable_limit0_on_scan: true,
     planner.enable_mergejoin: true,
     planner.enable_multiphase_agg: true,
     planner.enable_mux_exchange: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index febabfe..8b001f6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -280,7 +280,9 @@ public class TestPartitionFilter extends PlanTestBase {
   public void testMainQueryFilterRegularColumn() throws Exception {
     String query = "select * from (select dir0, o_custkey from dfs.`multilevel/parquet` where dir0='1994' and o_custkey = 10) t limit 0";
     // with Parquet RG filter pushdown, reduce to 1 file ( o_custkey all > 10).
-    testIncludeFilter(query, 1, "Filter\\(", 0);
+    // There is a LIMIT(0) inserted on top of SCAN, so filter push down is not applied.
+    // Since this is a LIMIT 0 query, not pushing down the filter should not cause a perf. regression.
+    testIncludeFilter(query, 4, "Filter", 0);
   }
 
   @Test // see DRILL-2852 and DRILL-3591
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
new file mode 100644
index 0000000..f819260
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.junit.Test;
+
+public class TestLateLimit0Optimization extends BaseTestQuery {
+
+  private static String wrapLimit0(final String query) {
+    return "SELECT * FROM (" + query + ") LZT LIMIT 0";
+  }
+
+  private static void checkThatQueryIsOptimized(final String query) throws Exception {
+    PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+        new String[]{
+            ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"
+        }, new String[]{});
+  }
+
+  private static void checkThatQueryIsNotOptimized(final String query) throws Exception {
+    PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+        new String[]{},
+        new String[]{
+            ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"
+        });
+  }
+
+  @Test
+  public void convertFromJson() throws Exception {
+    checkThatQueryIsNotOptimized("SELECT CONVERT_FROM('{x:100, y:215.6}' ,'JSON') AS MYCOL FROM (VALUES(1))");
+  }
+
+  @Test
+  public void convertToIntBE() throws Exception {
+    checkThatQueryIsOptimized("SELECT CONVERT_TO(r_regionkey, 'INT_BE') FROM cp.`tpch/region.parquet`");
+  }
+
+  @Test
+  public void convertToOthers() throws Exception {
+    checkThatQueryIsOptimized("SELECT r_regionkey,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT')) as i,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_BE')) as i_be,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l_be,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_name, 'UTF8')) u8,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_name, 'UTF16')) u16,\n" +
+        "  STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_HADOOPV')) as l_be\n" +
+        "FROM cp.`tpch/region.parquet`");
+  }
+
+  @Test
+  public void union() throws Exception {
+    checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union " +
+        "(select r_regionname from cp.`tpch/region.parquet`)");
+  }
+
+  @Test
+  public void unionAll() throws Exception {
+    checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union all " +
+        "(select r_regionname from cp.`tpch/region.parquet`)");
+  }
+
+  @Test
+  public void flatten() throws Exception {
+    checkThatQueryIsNotOptimized("select flatten(arr) as a from cp.`/flatten/drill-3370.json`");
+  }
+
+  @Test
+  public void flatten2() throws Exception {
+    checkThatQueryIsNotOptimized("select uid, lst_lst, d.lst_lst[1], flatten(d.lst_lst) lst " +
+        "from cp.`tpch/region.parquet` d order by d.lst_lst[1][2]"); // table is just for validation
+  }
+
+  @Test
+  public void flatten3() throws Exception {
+    checkThatQueryIsNotOptimized("select s.evnts.evnt_id from (select d.type type, flatten(d.events) evnts from " +
+        "cp.`tpch/region.parquet` d where d.type='web' order by d.uid) s " +
+        "where s.evnts.type = 'cmpgn4' and s.type='web'"); // table is just for validation
+  }
+
+  @Test
+  public void flatten4() throws Exception {
+    checkThatQueryIsNotOptimized("select flatten(lst) from (select uid, flatten(d.lst_lst) lst from " +
+        "cp.`tpch/region.parquet` d) s1 order by s1.lst[3]"); // table is just for validation
+  }
+
+  @Test
+  public void countDistinct() throws Exception {
+    checkThatQueryIsOptimized("SELECT COUNT(employee_id), " +
+            "SUM(employee_id), " +
+            "COUNT(DISTINCT employee_id) " +
+            "FROM cp.`employee.json`");
+  }
+
+}