You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/07/23 11:45:13 UTC
[drill] 01/02: DRILL-6574: Add option to push LIMIT(0) on top of
SCAN (late limit 0 optimization)
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit aa6a898a596ce9a8e2677ebc4987947bffc56dcb
Author: Sudheesh Katkam <sk...@maprtech.com>
AuthorDate: Mon Mar 21 15:32:47 2016 -0700
DRILL-6574: Add option to push LIMIT(0) on top of SCAN (late limit 0 optimization)
---
.../java/org/apache/drill/exec/ExecConstants.java | 3 +
.../planner/sql/handlers/DefaultSqlHandler.java | 3 +
.../planner/sql/handlers/FindLimit0Visitor.java | 119 +++++++++++++++++++++
.../exec/server/options/SystemOptionManager.java | 1 +
.../java-exec/src/main/resources/drill-module.conf | 3 +-
.../java/org/apache/drill/TestPartitionFilter.java | 4 +-
.../impl/limit/TestLateLimit0Optimization.java | 112 +++++++++++++++++++
7 files changed, 243 insertions(+), 2 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index d0842d2..282ad30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -453,6 +453,9 @@ public final class ExecConstants {
public static final String EARLY_LIMIT0_OPT_KEY = "planner.enable_limit0_optimization";
public static final BooleanValidator EARLY_LIMIT0_OPT = new BooleanValidator(EARLY_LIMIT0_OPT_KEY);
+ public static final String LATE_LIMIT0_OPT_KEY = "planner.enable_limit0_on_scan";
+ public static final BooleanValidator LATE_LIMIT0_OPT = new BooleanValidator(LATE_LIMIT0_OPT_KEY);
+
public static final String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation";
public static final OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 83e1a8f..cc2ec60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -287,6 +287,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
if (FindLimit0Visitor.containsLimit0(convertedRelNodeWithSum0) &&
FindHardDistributionScans.canForceSingleMode(convertedRelNodeWithSum0)) {
context.getPlannerSettings().forceSingleMode();
+ if (context.getOptions().getOption(ExecConstants.LATE_LIMIT0_OPT)) {
+ return FindLimit0Visitor.addLimitOnTopOfLeafNodes(drillRel);
+ }
}
return drillRel;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
index 8031609..3746d7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/FindLimit0Visitor.java
@@ -22,14 +22,23 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataTypeField;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos;
@@ -42,6 +51,8 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
@@ -49,6 +60,14 @@ import org.apache.drill.exec.store.direct.DirectGroupScan;
import java.util.ArrayList;
import java.util.List;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
+import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.common.DrillUnionRelBase;
+import org.apache.drill.exec.util.Pointer;
+
+import java.math.BigDecimal;
+import java.util.Set;
/**
* Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
@@ -128,6 +147,100 @@ public class FindLimit0Visitor extends RelShuttleImpl {
private boolean contains = false;
+ private static final Set<String> unsupportedFunctions = ImmutableSet.<String>builder()
+ // see Mappify
+ .add("KVGEN")
+ .add("MAPPIFY")
+ // see DummyFlatten
+ .add("FLATTEN")
+ // see JsonConvertFrom
+ .add("CONVERT_FROMJSON")
+ // see JsonConvertTo class
+ .add("CONVERT_TOJSON")
+ .add("CONVERT_TOSIMPLEJSON")
+ .add("CONVERT_TOEXTENDEDJSON")
+ .build();
+
+ private static boolean isUnsupportedScalarFunction(final SqlOperator operator) {
+ return operator instanceof DrillSqlOperator &&
+ unsupportedFunctions.contains(operator.getName().toUpperCase());
+ }
+
+ /**
+ * TODO(DRILL-3993): Use RelBuilder to create a limit node to allow for applying this optimization in potentially
+ * any of the transformations, but currently this can be applied after Drill logical transformation, and before
+ * Drill physical transformation.
+ */
+ public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) {
+ final Pointer<Boolean> isUnsupported = new Pointer<>(false);
+
+ // to visit unsupported functions
+ final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() {
+ @Override
+ public RexNode visitCall(RexCall call) {
+ final SqlOperator operator = call.getOperator();
+ if (isUnsupportedScalarFunction(operator)) {
+ isUnsupported.value = true;
+ return call;
+ }
+ return super.visitCall(call);
+ }
+ };
+
+ // to visit unsupported operators
+ final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof DrillUnionRelBase) {
+ isUnsupported.value = true;
+ return other;
+ } else if (other instanceof DrillProjectRelBase) {
+ other.accept(unsupportedFunctionsVisitor);
+ if (isUnsupported.value) {
+ return other;
+ }
+ }
+ return super.visit(other);
+ }
+ };
+
+ rel.accept(unsupportedOperationsVisitor);
+ if (isUnsupported.value) {
+ return rel;
+ }
+
+ // to add LIMIT (0) on top of leaf nodes
+ final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() {
+
+ private RelNode addLimitAsParent(RelNode node) {
+ final RexBuilder builder = node.getCluster().getRexBuilder();
+ final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO);
+ final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO);
+ return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, offset, fetch);
+ }
+
+ @Override
+ public RelNode visit(LogicalValues values) {
+ return addLimitAsParent(values);
+ }
+
+ @Override
+ public RelNode visit(TableScan scan) {
+ return addLimitAsParent(scan);
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other.getInputs().size() == 0) { // leaf operator
+ return addLimitAsParent(other);
+ }
+ return super.visit(other);
+ }
+ };
+
+ return (DrillRel) rel.accept(addLimitOnScanVisitor);
+ }
+
private FindLimit0Visitor() {
}
@@ -147,6 +260,11 @@ public class FindLimit0Visitor extends RelShuttleImpl {
@Override
public RelNode visit(RelNode other) {
+ if (other instanceof DrillJoinRelBase ||
+ other instanceof DrillAggregateRelBase ||
+ other instanceof DrillUnionRelBase) {
+ return other;
+ }
if (other instanceof DrillLimitRel) {
if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
contains = true;
@@ -157,6 +275,7 @@ public class FindLimit0Visitor extends RelShuttleImpl {
return super.visit(other);
}
+ // TODO: The following nodes are never visited because this visitor is used after logical transformation!
// The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning.
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5ee3825..a627821 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -189,6 +189,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE),
new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
+ new OptionDefinition(ExecConstants.LATE_LIMIT0_OPT),
new OptionDefinition(ExecConstants.ENABLE_MEMORY_ESTIMATION),
new OptionDefinition(ExecConstants.MAX_QUERY_MEMORY_PER_NODE),
new OptionDefinition(ExecConstants.PERCENT_MEMORY_PER_QUERY),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 19e779d..16a285b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -508,7 +508,8 @@ drill.exec.options: {
planner.enable_hep_opt: true,
planner.enable_hep_partition_pruning: true,
planner.enable_join_optimization: true,
- planner.enable_limit0_optimization: false,
+ planner.enable_limit0_optimization: true,
+ planner.enable_limit0_on_scan: true,
planner.enable_mergejoin: true,
planner.enable_multiphase_agg: true,
planner.enable_mux_exchange: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index febabfe..8b001f6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -280,7 +280,9 @@ public class TestPartitionFilter extends PlanTestBase {
public void testMainQueryFilterRegularColumn() throws Exception {
String query = "select * from (select dir0, o_custkey from dfs.`multilevel/parquet` where dir0='1994' and o_custkey = 10) t limit 0";
// with Parquet RG filter pushdown, reduce to 1 file ( o_custkey all > 10).
- testIncludeFilter(query, 1, "Filter\\(", 0);
+ // There is a LIMIT(0) inserted on top of SCAN, so filter push down is not applied.
+ // Since this is a LIMIT 0 query, not pushing down the filter should not cause a perf. regression.
+ testIncludeFilter(query, 4, "Filter", 0);
}
@Test // see DRILL-2852 and DRILL-3591
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
new file mode 100644
index 0000000..f819260
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLateLimit0Optimization.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.PlanTestBase;
+import org.junit.Test;
+
+public class TestLateLimit0Optimization extends BaseTestQuery {
+
+ private static String wrapLimit0(final String query) {
+ return "SELECT * FROM (" + query + ") LZT LIMIT 0";
+ }
+
+ private static void checkThatQueryIsOptimized(final String query) throws Exception {
+ PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+ new String[]{
+ ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"
+ }, new String[]{});
+ }
+
+ private static void checkThatQueryIsNotOptimized(final String query) throws Exception {
+ PlanTestBase.testPlanMatchingPatterns(wrapLimit0(query),
+ new String[]{},
+ new String[]{
+ ".*Limit\\(offset=\\[0\\], fetch=\\[0\\]\\)(.*[\n\r])+.*Scan.*"
+ });
+ }
+
+ @Test
+ public void convertFromJson() throws Exception {
+ checkThatQueryIsNotOptimized("SELECT CONVERT_FROM('{x:100, y:215.6}' ,'JSON') AS MYCOL FROM (VALUES(1))");
+ }
+
+ @Test
+ public void convertToIntBE() throws Exception {
+ checkThatQueryIsOptimized("SELECT CONVERT_TO(r_regionkey, 'INT_BE') FROM cp.`tpch/region.parquet`");
+ }
+
+ @Test
+ public void convertToOthers() throws Exception {
+ checkThatQueryIsOptimized("SELECT r_regionkey,\n" +
+ " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT')) as i,\n" +
+ " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_BE')) as i_be,\n" +
+ " STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l,\n" +
+ " STRING_BINARY(CONVERT_TO(r_regionkey, 'BIGINT')) as l_be,\n" +
+ " STRING_BINARY(CONVERT_TO(r_name, 'UTF8')) u8,\n" +
+ " STRING_BINARY(CONVERT_TO(r_name, 'UTF16')) u16,\n" +
+ " STRING_BINARY(CONVERT_TO(r_regionkey, 'INT_HADOOPV')) as l_be\n" +
+ "FROM cp.`tpch/region.parquet`");
+ }
+
+ @Test
+ public void union() throws Exception {
+ checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union " +
+ "(select r_regionname from cp.`tpch/region.parquet`)");
+ }
+
+ @Test
+ public void unionAll() throws Exception {
+ checkThatQueryIsNotOptimized("(select n_regionkey from cp.`tpch/nation.parquet`) union all " +
+ "(select r_regionname from cp.`tpch/region.parquet`)");
+ }
+
+ @Test
+ public void flatten() throws Exception {
+ checkThatQueryIsNotOptimized("select flatten(arr) as a from cp.`/flatten/drill-3370.json`");
+ }
+
+ @Test
+ public void flatten2() throws Exception {
+ checkThatQueryIsNotOptimized("select uid, lst_lst, d.lst_lst[1], flatten(d.lst_lst) lst " +
+ "from cp.`tpch/region.parquet` d order by d.lst_lst[1][2]"); // table is just for validation
+ }
+
+ @Test
+ public void flatten3() throws Exception {
+ checkThatQueryIsNotOptimized("select s.evnts.evnt_id from (select d.type type, flatten(d.events) evnts from " +
+ "cp.`tpch/region.parquet` d where d.type='web' order by d.uid) s " +
+ "where s.evnts.type = 'cmpgn4' and s.type='web'"); // table is just for validation
+ }
+
+ @Test
+ public void flatten4() throws Exception {
+ checkThatQueryIsNotOptimized("select flatten(lst) from (select uid, flatten(d.lst_lst) lst from " +
+ "cp.`tpch/region.parquet` d) s1 order by s1.lst[3]"); // table is just for validation
+ }
+
+ @Test
+ public void countDistinct() throws Exception {
+ checkThatQueryIsOptimized("SELECT COUNT(employee_id), " +
+ "SUM(employee_id), " +
+ "COUNT(DISTINCT employee_id) " +
+ "FROM cp.`employee.json`");
+ }
+
+}