You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/06/22 21:14:32 UTC

[drill] 01/09: DRILL-5188: Expand sub-queries using rules

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 502d2977092eecda0a4aa0482b5f96459c315227
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Fri May 18 15:54:16 2018 +0300

    DRILL-5188: Expand sub-queries using rules
    
    - Add check for agg with group by literal
    - Allow NLJ for limit 1
    - Implement single_value aggregate function
    
    closes #1321
---
 exec/java-exec/src/main/codegen/config.fmpp        |   1 +
 .../src/main/codegen/data/SingleValue.tdd          |  62 +++++++++
 .../src/main/codegen/templates/SingleValueAgg.java | 144 +++++++++++++++++++++
 .../drill/exec/physical/impl/join/JoinUtils.java   | 105 ++++++++++++++-
 .../apache/drill/exec/planner/PlannerPhase.java    |  10 ++
 .../apache/drill/exec/planner/RuleInstance.java    |  10 ++
 .../exec/planner/logical/PreProcessLogicalRel.java |  18 +--
 .../drill/exec/planner/sql/SqlConverter.java       |   8 +-
 .../planner/sql/handlers/DefaultSqlHandler.java    |  12 +-
 .../java/org/apache/drill/TestCorrelation.java     |  46 +++++++
 .../apache/drill/TestDisabledFunctionality.java    |  37 ------
 .../java/org/apache/drill/TestExampleQueries.java  |  33 +++++
 .../java/org/apache/drill/TestTpchDistributed.java |   1 -
 .../apache/drill/TestTpchDistributedStreaming.java |   1 -
 .../java/org/apache/drill/TestTpchExplain.java     |   1 -
 .../test/java/org/apache/drill/TestTpchLimit0.java |   1 -
 .../java/org/apache/drill/TestTpchPlanning.java    |   1 -
 .../java/org/apache/drill/TestTpchSingleMode.java  |   1 -
 .../drill/exec/fn/impl/TestAggregateFunctions.java |  80 ++++++++++++
 pom.xml                                            |   2 +-
 20 files changed, 504 insertions(+), 70 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/config.fmpp b/exec/java-exec/src/main/codegen/config.fmpp
index 50f110d..e233974 100644
--- a/exec/java-exec/src/main/codegen/config.fmpp
+++ b/exec/java-exec/src/main/codegen/config.fmpp
@@ -43,6 +43,7 @@ data: {
     intervalNumericTypes:     tdd(../data/IntervalNumericTypes.tdd),
     extract:                  tdd(../data/ExtractTypes.tdd),
     sumzero:                  tdd(../data/SumZero.tdd),
+    singleValue:              tdd(../data/SingleValue.tdd),
     numericTypes:             tdd(../data/NumericTypes.tdd),
     casthigh:                 tdd(../data/CastHigh.tdd),
     countAggrTypes:           tdd(../data/CountAggrTypes.tdd)
diff --git a/exec/java-exec/src/main/codegen/data/SingleValue.tdd b/exec/java-exec/src/main/codegen/data/SingleValue.tdd
new file mode 100644
index 0000000..a42fe3b
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/data/SingleValue.tdd
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+types: [
+    {inputType: "Bit", outputType: "NullableBit", runningType: "Bit", major: "primitive"},
+    {inputType: "TinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"},
+    {inputType: "NullableTinyInt", outputType: "NullableTinyInt", runningType: "TinyInt", major: "primitive"},
+    {inputType: "UInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"},
+    {inputType: "NullableUInt1", outputType: "NullableUInt1", runningType: "UInt1", major: "primitive"},
+    {inputType: "UInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"},
+    {inputType: "NullableUInt2", outputType: "NullableUInt2", runningType: "UInt2", major: "primitive"},
+    {inputType: "SmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"},
+    {inputType: "NullableSmallInt", outputType: "NullableSmallInt", runningType: "SmallInt", major: "primitive"},
+    {inputType: "UInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"},
+    {inputType: "NullableUInt4", outputType: "NullableUInt4", runningType: "UInt4", major: "primitive"},
+    {inputType: "UInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"},
+    {inputType: "NullableUInt8", outputType: "NullableUInt8", runningType: "UInt8", major: "primitive"},
+    {inputType: "Int", outputType: "NullableInt", runningType: "Int", major: "primitive"},
+    {inputType: "BigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"},
+    {inputType: "NullableBit", outputType: "NullableBit", runningType: "Bit", major: "primitive"},
+    {inputType: "NullableInt", outputType: "NullableInt", runningType: "Int", major: "primitive"},
+    {inputType: "NullableBigInt", outputType: "NullableBigInt", runningType: "BigInt", major: "primitive"},
+    {inputType: "Float4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"},
+    {inputType: "Float8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"},
+    {inputType: "NullableFloat4", outputType: "NullableFloat4", runningType: "Float4", major: "primitive"},
+    {inputType: "NullableFloat8", outputType: "NullableFloat8", runningType: "Float8", major: "primitive"},
+    {inputType: "Date", outputType: "NullableDate", runningType: "Date", major: "primitive"},
+    {inputType: "NullableDate", outputType: "NullableDate", runningType: "Date", major: "primitive"},
+    {inputType: "TimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"},
+    {inputType: "NullableTimeStamp", outputType: "NullableTimeStamp", runningType: "TimeStamp", major: "primitive"},
+    {inputType: "Time", outputType: "NullableTime", runningType: "Time", major: "primitive"},
+    {inputType: "NullableTime", outputType: "NullableTime", runningType: "Time", major: "primitive"},
+    {inputType: "IntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"},
+    {inputType: "NullableIntervalDay", outputType: "NullableIntervalDay", runningType: "IntervalDay", major: "IntervalDay"},
+    {inputType: "IntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"},
+    {inputType: "NullableIntervalYear", outputType: "NullableIntervalYear", runningType: "IntervalYear", major: "primitive"},
+    {inputType: "Interval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"},
+    {inputType: "NullableInterval", outputType: "NullableInterval", runningType: "Interval", major: "Interval"},
+    {inputType: "VarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"},
+    {inputType: "NullableVarDecimal", outputType: "NullableVarDecimal", runningType: "VarDecimal", major: "VarDecimal"},
+    {inputType: "VarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"},
+    {inputType: "NullableVarChar", outputType: "NullableVarChar", runningType: "VarChar", major: "bytes"},
+    {inputType: "Var16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"},
+    {inputType: "NullableVar16Char", outputType: "NullableVar16Char", runningType: "Var16Char", major: "bytes"},
+    {inputType: "VarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"},
+    {inputType: "NullableVarBinary", outputType: "NullableVarBinary", runningType: "VarBinary", major: "bytes"}
+   ]
+}
diff --git a/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java
new file mode 100644
index 0000000..c0ff6cf
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/SingleValueAgg.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+<@pp.changeOutputFile name="/org/apache/drill/exec/expr/fn/impl/gaggr/SingleValueFunctions.java" />
+
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.expr.fn.impl.gaggr;
+
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+
+import javax.inject.Inject;
+import io.netty.buffer.DrillBuf;
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+@SuppressWarnings("unused")
+public class SingleValueFunctions {
+<#list singleValue.types as type>
+
+  @FunctionTemplate(name = "single_value",
+                  <#if type.major == "VarDecimal">
+                    returnType = FunctionTemplate.ReturnType.DECIMAL_AVG_AGGREGATE,
+                  </#if>
+                    scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+  public static class ${type.inputType}SingleValue implements DrillAggFunc {
+    @Param ${type.inputType}Holder in;
+    @Workspace ${type.runningType}Holder value;
+    @Output ${type.outputType}Holder out;
+    @Workspace BigIntHolder nonNullCount;
+    <#if type.major == "VarDecimal" || type.major == "bytes">
+    @Inject DrillBuf buffer;
+    </#if>
+
+    public void setup() {
+      nonNullCount = new BigIntHolder();
+      nonNullCount.value = 0;
+      value = new ${type.runningType}Holder();
+    }
+
+    @Override
+    public void add() {
+    <#if type.inputType?starts_with("Nullable")>
+      sout: {
+        if (in.isSet == 0) {
+          // processing nullable input and the value is null, so don't do anything...
+          break sout;
+        }
+	  </#if>
+      if (nonNullCount.value == 0) {
+        nonNullCount.value = 1;
+      } else {
+        throw org.apache.drill.common.exceptions.UserException.functionError()
+            .message("Input for single_value function has more than one row")
+            .build();
+      }
+    <#if type.major == "primitive">
+      value.value = in.value;
+    <#elseif type.major == "IntervalDay">
+      value.days = in.days;
+      value.milliseconds = in.milliseconds;
+    <#elseif type.major == "Interval">
+      value.days = in.days;
+      value.milliseconds = in.milliseconds;
+      value.months = in.months;
+    <#elseif type.major == "VarDecimal">
+      value.start = in.start;
+      value.end = in.end;
+      value.buffer = in.buffer;
+      value.scale = in.scale;
+      value.precision = in.precision;
+    <#elseif type.major == "bytes">
+      value.start = in.start;
+      value.end = in.end;
+      value.buffer = in.buffer;
+    </#if>
+    <#if type.inputType?starts_with("Nullable")>
+      } // end of sout block
+	  </#if>
+    }
+
+    @Override
+    public void output() {
+      if (nonNullCount.value > 0) {
+        out.isSet = 1;
+      <#if type.major == "primitive">
+        out.value = value.value;
+      <#elseif type.major == "IntervalDay">
+        out.days = value.days;
+        out.milliseconds = value.milliseconds;
+      <#elseif type.major == "Interval">
+        out.days = value.days;
+        out.milliseconds = value.milliseconds;
+        out.months = value.months;
+      <#elseif type.major == "VarDecimal">
+        out.start = value.start;
+        out.end = value.end;
+        out.buffer = buffer.reallocIfNeeded(value.end - value.start);
+        out.buffer.writeBytes(value.buffer, value.start, value.end - value.start);
+        out.scale = value.scale;
+        out.precision = value.precision;
+      <#elseif type.major == "bytes">
+        out.start = value.start;
+        out.end = value.end;
+        out.buffer = buffer.reallocIfNeeded(value.end - value.start);
+        out.buffer.writeBytes(value.buffer, value.start, value.end - value.start);
+      </#if>
+      } else {
+        out.isSet = 0;
+      }
+    }
+
+    @Override
+    public void reset() {
+      value = new ${type.runningType}Holder();
+      nonNullCount.value = 0;
+    }
+  }
+</#list>
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index e4dab91..b974537 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -17,9 +17,22 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.Util;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.calcite.rel.RelNode;
@@ -35,9 +48,11 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.resolver.TypeCastRules;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -220,7 +235,13 @@ public class JoinUtils {
       if (currentrel instanceof DrillAggregateRel) {
         agg = (DrillAggregateRel)currentrel;
       } else if (currentrel instanceof RelSubset) {
-        currentrel = ((RelSubset)currentrel).getBest() ;
+        currentrel = ((RelSubset) currentrel).getBest() ;
+      } else if (currentrel instanceof DrillLimitRel) {
+        // TODO: Improve this check when DRILL-5691 is fixed.
+        // The problem is that RelMdMaxRowCount currently cannot be used
+        // due to CALCITE-1048.
+        Integer fetchValue = ((RexLiteral) ((DrillLimitRel) currentrel).getFetch()).getValueAs(Integer.class);
+        return fetchValue != null && fetchValue <= 1;
       } else if (currentrel.getInputs().size() == 1) {
         // If the rel is not an aggregate or RelSubset, but is a single-input rel (could be Project,
         // Filter, Sort etc.), check its input
@@ -234,6 +255,17 @@ public class JoinUtils {
       if (agg.getGroupSet().isEmpty()) {
         return true;
       }
+      // Checks that expression in group by is a single and it is literal.
+      // When Calcite rewrites EXISTS sub-queries using SubQueryRemoveRule rules,
+      // it creates project with TRUE literal in expressions list and aggregate on top of it
+      // with empty call list and literal from project expression in group set.
+      if (agg.getAggCallList().isEmpty() && agg.getGroupSet().cardinality() == 1) {
+        ProjectExpressionsCollector expressionsCollector = new ProjectExpressionsCollector();
+        agg.accept(expressionsCollector);
+        List<RexNode> projectedExpressions = expressionsCollector.getProjectedExpressions();
+        return projectedExpressions.size() == 1
+            && RexUtil.isLiteral(projectedExpressions.get(agg.getGroupSet().nth(0)), true);
+      }
     }
     return false;
   }
@@ -267,4 +299,75 @@ public class JoinUtils {
     return isScalarSubquery(left) || isScalarSubquery(right);
   }
 
+  /**
+   * Collects expressions list from the input project.
+   * For the case when input rel node has single input, its input is taken.
+   */
+  private static class ProjectExpressionsCollector extends RelShuttleImpl {
+    private final List<RexNode> expressions = new ArrayList<>();
+
+    @Override
+    public RelNode visit(RelNode other) {
+      // RelShuttleImpl doesn't have visit methods for Project and RelSubset.
+      if (other instanceof RelSubset) {
+        return visit((RelSubset) other);
+      } else if (other instanceof Project) {
+        return visit((Project) other);
+      }
+      return super.visit(other);
+    }
+
+    @Override
+    public RelNode visit(TableFunctionScan scan) {
+      return scan;
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+      return join;
+    }
+
+    @Override
+    public RelNode visit(LogicalCorrelate correlate) {
+      return correlate;
+    }
+
+    @Override
+    public RelNode visit(LogicalUnion union) {
+      return union;
+    }
+
+    @Override
+    public RelNode visit(LogicalIntersect intersect) {
+      return intersect;
+    }
+
+    @Override
+    public RelNode visit(LogicalMinus minus) {
+      return minus;
+    }
+
+    @Override
+    public RelNode visit(LogicalSort sort) {
+      return sort;
+    }
+
+    @Override
+    public RelNode visit(LogicalExchange exchange) {
+      return exchange;
+    }
+
+    private RelNode visit(Project project) {
+      expressions.addAll(project.getProjects());
+      return project;
+    }
+
+    private RelNode visit(RelSubset subset) {
+      return Util.first(subset.getBest(), subset.getOriginal()).accept(this);
+    }
+
+    public List<RexNode> getProjectedExpressions() {
+      return expressions;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 2a79751..b78d76c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -113,6 +113,16 @@ public enum PlannerPhase {
     }
   },
 
+  SUBQUERY_REWRITE("Sub-queries rewrites") {
+    public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
+      return RuleSets.ofList(
+          RuleInstance.SUB_QUERY_FILTER_REMOVE_RULE,
+          RuleInstance.SUB_QUERY_PROJECT_REMOVE_RULE,
+          RuleInstance.SUB_QUERY_JOIN_REMOVE_RULE
+      );
+    }
+  },
+
   LOGICAL_PRUNE("Logical Planning (with partition pruning)") {
     public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
       return PlannerPhase.mergedRuleSets(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 80bbe88..8aec96c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -40,6 +40,7 @@ import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
 import org.apache.calcite.rel.rules.ReduceExpressionsRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
+import org.apache.calcite.rel.rules.SubQueryRemoveRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import org.apache.drill.exec.planner.logical.DrillConditions;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
@@ -130,4 +131,13 @@ public interface RuleInstance {
 
   FilterRemoveIsNotDistinctFromRule REMOVE_IS_NOT_DISTINCT_FROM_RULE =
       new FilterRemoveIsNotDistinctFromRule(DrillRelBuilder.proto(DrillRelFactories.DRILL_LOGICAL_FILTER_FACTORY));
+
+  SubQueryRemoveRule SUB_QUERY_FILTER_REMOVE_RULE =
+      new SubQueryRemoveRule.SubQueryFilterRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
+
+  SubQueryRemoveRule SUB_QUERY_PROJECT_REMOVE_RULE =
+      new SubQueryRemoveRule.SubQueryProjectRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
+
+  SubQueryRemoveRule SUB_QUERY_JOIN_REMOVE_RULE =
+      new SubQueryRemoveRule.SubQueryJoinRemoveRule(DrillRelFactories.LOGICAL_BUILDER);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
index f3c6ce0..cd696ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/PreProcessLogicalRel.java
@@ -32,8 +32,6 @@ import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.planner.sql.parser.DrillCalciteWrapperUtility;
 import org.apache.drill.exec.util.ApproximateStringMatcher;
 import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
@@ -46,7 +44,6 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlSingleValueAggFunction;
 import org.apache.calcite.util.NlsString;
 
 /**
@@ -79,19 +76,6 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
   }
 
   @Override
-  public RelNode visit(LogicalAggregate aggregate) {
-    for(AggregateCall aggregateCall : aggregate.getAggCallList()) {
-      if(aggregateCall.getAggregation() instanceof SqlSingleValueAggFunction) {
-        unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.FUNCTION,
-            "Non-scalar sub-query used in an expression\n" +
-            "See Apache Drill JIRA: DRILL-1937");
-        throw new UnsupportedOperationException();
-      }
-    }
-    return visitChild(aggregate, 0, aggregate.getInput());
-  }
-
-  @Override
   public RelNode visit(LogicalProject project) {
     final List<RexNode> projExpr = Lists.newArrayList();
     for(RexNode rexNode : project.getChildExps()) {
@@ -168,7 +152,7 @@ public class PreProcessLogicalRel extends RelShuttleImpl {
       exprList.add(newExpr);
     }
 
-    if (rewrite == true) {
+    if (rewrite) {
       LogicalProject newProject = project.copy(project.getTraitSet(), project.getInput(0), exprList, project.getRowType());
       return visitChild(newProject, 0, project.getInput());
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index b8659d1..3f65ad2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -64,7 +64,6 @@ import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
-import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.Util;
@@ -384,10 +383,7 @@ public class SqlConverter {
     //To avoid unexpected column errors set a value of top to false
     final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false);
     final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
-    final RelRoot rel3 = rel2.withRel(
-        RelDecorrelator.decorrelateQuery(rel2.rel,
-            sqlToRelConverterConfig.getRelBuilderFactory().create(cluster, null)));
-    return rel3;
+    return rel2;
   }
 
   private class Expander implements RelOptTable.ViewExpander {
@@ -478,7 +474,7 @@ public class SqlConverter {
 
     @Override
     public boolean isExpand() {
-      return SqlToRelConverterConfig.DEFAULT.isExpand();
+      return false;
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 98e017f..1e671ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -55,6 +55,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.sql2rel.RelDecorrelator;
 import org.apache.calcite.tools.Program;
 import org.apache.calcite.tools.Programs;
 import org.apache.calcite.tools.RelConversionException;
@@ -78,6 +79,7 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.cost.DrillDefaultRelMetadataProvider;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
 import org.apache.drill.exec.planner.logical.PreProcessLogicalRel;
@@ -658,10 +660,16 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
     return typedSqlNode;
   }
 
-  private RelNode convertToRel(SqlNode node) throws RelConversionException {
+  private RelNode convertToRel(SqlNode node) {
     final RelNode convertedNode = config.getConverter().toRel(node).rel;
     log("INITIAL", convertedNode, logger, null);
-    return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, convertedNode);
+    RelNode transformedNode = transform(PlannerType.HEP,
+        PlannerPhase.SUBQUERY_REWRITE, convertedNode);
+
+    RelNode decorrelatedNode = RelDecorrelator.decorrelateQuery(transformedNode,
+        DrillRelFactories.LOGICAL_BUILDER.create(transformedNode.getCluster(), null));
+
+    return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, decorrelatedNode);
   }
 
   private RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
index 47f0ae8..d40540e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCorrelation.java
@@ -58,4 +58,50 @@ public class TestCorrelation extends PlanTestBase {
       .run();
   }
 
+  @Test
+  public void testExistsScalarSubquery() throws Exception {
+    String query =
+        "SELECT employee_id\n" +
+        "FROM cp.`employee.json`\n" +
+        "WHERE EXISTS\n" +
+        "    (SELECT *\n" +
+        "     FROM cp.`employee.json` cs2\n" +
+        "     )\n" +
+        "limit 1";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("employee_id")
+        .baselineValues(1L)
+        .go();
+  }
+
+  @Test
+  public void testSeveralExistsCorrelateSubquery() throws Exception {
+    String query =
+        "SELECT cs1.employee_id\n" +
+        "FROM cp.`employee.json` cs1,\n" +
+        "     cp.`employee.json` cs3\n" +
+        "WHERE cs1.hire_date = cs3.hire_date\n" +
+        "  AND EXISTS\n" +
+        "    (SELECT *\n" +
+        "     FROM cp.`employee.json` cs2\n" +
+        "     WHERE " +
+        "       cs1.position_id > cs2.position_id\n" +
+        "       AND" +
+        "       cs1.epmloyee_id = cs2.epmloyee_id" +
+        "       )\n" +
+        "  AND EXISTS\n" +
+        "    (SELECT *\n" +
+        "     FROM cp.`employee.json` cr1\n" +
+        "     WHERE cs1.position_id = cr1.position_id)\n" +
+        "LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .expectsEmptyResultSet()
+      .go();
+  }
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
index 781fce3..679d01e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDisabledFunctionality.java
@@ -47,30 +47,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
     throw ex;
   }
 
-  @Test(expected = UnsupportedFunctionException.class)  // see DRILL-1937
-  public void testDisabledExplainplanForComparisonWithNonscalarSubquery() throws Exception {
-    try {
-      test("explain plan for select n_name from cp.`tpch/nation.parquet` " +
-           "where n_nationkey = " +
-           "(select r_regionkey from cp.`tpch/region.parquet` " +
-           "where r_regionkey = 1)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
-  @Test(expected = UnsupportedFunctionException.class)  // see DRILL-1937
-  public void testDisabledComparisonWithNonscalarSubquery() throws Exception {
-    try {
-      test("select n_name from cp.`tpch/nation.parquet` " +
-           "where n_nationkey = " +
-           "(select r_regionkey from cp.`tpch/region.parquet` " +
-           "where r_regionkey = 1)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-1921
   public void testDisabledIntersect() throws Exception {
     try {
@@ -215,19 +191,6 @@ public class TestDisabledFunctionality extends BaseTestQuery {
     }
   }
 
-  @Test(expected = UnsupportedFunctionException.class) // see DRILL-1325, DRILL-2155, see DRILL-1937
-  public void testMultipleUnsupportedOperatorations() throws Exception {
-    try {
-      test("select a.lastname, b.n_name " +
-          "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " +
-          "where b.n_nationkey = " +
-          "(select r_regionkey from cp.`tpch/region.parquet` " +
-          "where r_regionkey = 1)");
-    } catch(UserException ex) {
-      throwAsUnsupportedException(ex);
-    }
-  }
-
   @Test(expected = UnsupportedRelOperatorException.class) // see DRILL-2068, DRILL-1325
   public void testExplainPlanForCartesianJoin() throws Exception {
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 3e17a7a..adc8e35 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1165,4 +1165,37 @@ public class TestExampleQueries extends BaseTestQuery {
         .build()
         .run();
   }
+
+  @Test
+  public void testComparisonWithSingleValueSubQuery() throws Exception {
+    String query = "select n_name from cp.`tpch/nation.parquet` " +
+        "where n_nationkey = " +
+        "(select r_regionkey from cp.`tpch/region.parquet` " +
+        "where r_regionkey = 1)";
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[]{"agg.*SINGLE_VALUE", "Filter.*=\\(\\$0, 1\\)"});
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("n_name")
+        .baselineValues("ARGENTINA")
+        .go();
+  }
+
+  @Test
+  public void testMultipleComparisonWithSingleValueSubQuery() throws Exception {
+    String query = "select a.last_name, b.n_name " +
+        "from cp.`employee.json` a, cp.`tpch/nation.parquet` b " +
+        "where b.n_nationkey = " +
+        "(select r_regionkey from cp.`tpch/region.parquet` " +
+        "where r_regionkey = 1) limit 1";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("last_name", "n_name")
+        .baselineValues("Nowmer", "ARGENTINA")
+        .go();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
index 54f7250..fd97564 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java
@@ -138,7 +138,6 @@ public class TestTpchDistributed extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void tpch21() throws Exception{
     testDistributed("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
index 56287c9..ed242be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
@@ -145,7 +145,6 @@ public class TestTpchDistributedStreaming extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void tpch21() throws Exception{
     testDistributed("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
index 3cbe5ef..1aa9de2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
@@ -148,7 +148,6 @@ public class TestTpchExplain extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void tpch21() throws Exception{
     doExplain("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
index 9400c7c..51aea18 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchLimit0.java
@@ -140,7 +140,6 @@ public class TestTpchLimit0 extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void tpch21() throws Exception{
     testLimitZero("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
index 1745301..f995ce6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchPlanning.java
@@ -141,7 +141,6 @@ public class TestTpchPlanning extends PlanningBase {
   }
 
   @Test
-  @Ignore // DRILL-519
   public void tpch21() throws Exception {
     testSqlPlanFromFile("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
index 2863bbb..a3e34c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchSingleMode.java
@@ -139,7 +139,6 @@ public class TestTpchSingleMode extends BaseTestQuery {
   }
 
   @Test
-  @Ignore
   public void tpch21() throws Exception{
     testSingleMode("queries/tpch/21.sql");
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 0a5b7cd..445b18e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -25,6 +25,9 @@ import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.Text;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.PlanTestBase;
@@ -37,23 +40,32 @@ import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.math.BigDecimal;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @Category({SqlFunctionTest.class, OperatorTest.class, PlannerTest.class})
 public class TestAggregateFunctions extends BaseTestQuery {
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @BeforeClass
   public static void setupFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("agg"));
@@ -521,6 +533,7 @@ public class TestAggregateFunctions extends BaseTestQuery {
         .go();
 
   }
+
   @Test
   public void minMaxEmptyNonNullableInput() throws Exception {
     // test min and max functions on required type
@@ -567,6 +580,73 @@ public class TestAggregateFunctions extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testSingleValueFunction() throws Exception {
+    List<String> tableNames = ImmutableList.of(
+        "cp.`parquet/alltypes_required.parquet`",
+        "cp.`parquet/alltypes_optional.parquet`");
+    for (String tableName : tableNames) {
+      final QueryDataBatch result =
+          testSqlWithResults(String.format("select * from %s limit 1", tableName)).get(0);
+
+      final Map<String, StringBuilder> functions = new HashMap<>();
+      functions.put("single_value", new StringBuilder());
+
+      final Map<String, Object> resultingValues = new HashMap<>();
+      final List<String> columns = new ArrayList<>();
+
+      final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+      loader.load(result.getHeader().getDef(), result.getData());
+
+      for (VectorWrapper<?> vectorWrapper : loader.getContainer()) {
+        final String fieldName = vectorWrapper.getField().getName();
+        Object object = vectorWrapper.getValueVector().getAccessor().getObject(0);
+        // VarCharVector returns Text instance, but baseline values should contain String value
+        if (object instanceof Text) {
+          object = object.toString();
+        }
+        resultingValues.put(String.format("`%s`", fieldName), object);
+        for (Map.Entry<String, StringBuilder> function : functions.entrySet()) {
+          function.getValue()
+              .append(function.getKey())
+              .append("(")
+              .append(fieldName)
+              .append(") ")
+              .append(fieldName)
+              .append(",");
+        }
+        columns.add(fieldName);
+      }
+      loader.clear();
+      result.release();
+
+      String columnsList = columns.stream()
+          .collect(Collectors.joining(", "));
+
+      final List<Map<String, Object>> baselineRecords = new ArrayList<>();
+      baselineRecords.add(resultingValues);
+
+      for (StringBuilder selectBody : functions.values()) {
+        selectBody.setLength(selectBody.length() - 1);
+
+        testBuilder()
+            .sqlQuery("select %s from (select %s from %s limit 1)",
+                selectBody.toString(), columnsList, tableName)
+            .unOrdered()
+            .baselineRecords(baselineRecords)
+            .go();
+      }
+    }
+  }
+
+  @Test
+  public void testSingleValueWithMultipleValuesInput() throws Exception {
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("FUNCTION ERROR"));
+    thrown.expectMessage(containsString("Input for single_value function has more than one row"));
+    test("select single_value(n_name) from cp.`tpch/nation.parquet`");
+  }
+
   /*
    * Streaming agg on top of a filter produces wrong results if the first two batches are filtered out.
    * In the below test we have three files in the input directory and since the ordering of reading
diff --git a/pom.xml b/pom.xml
index 242b134..f69288e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
     <parquet.version>1.10.0</parquet.version>
-    <calcite.version>1.16.0-drill-r3</calcite.version>
+    <calcite.version>1.16.0-drill-r4</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
     <sqlline.version>1.1.9-drill-r7</sqlline.version>