You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/05/22 06:54:58 UTC

[hive] branch master updated: HIVE-23434: Add option to rewrite PERCENTILE_DISC to sketch functions (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 716f1f9  HIVE-23434: Add option to rewrite PERCENTILE_DISC to sketch functions (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
716f1f9 is described below

commit 716f1f9a945a9a11e6702754667660d27e0a5cf4
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Fri May 22 06:54:20 2020 +0000

    HIVE-23434: Add option to rewrite PERCENTILE_DISC to sketch functions (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
    
    Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   9 +-
 .../test/resources/testconfiguration.properties    |   4 +-
 .../hadoop/hive/ql/exec/DataSketchesFunctions.java |  20 +-
 .../HiveRewriteCountDistinctToDataSketches.java    | 175 ----------
 .../rules/HiveRewriteToDataSketchesRules.java      | 371 +++++++++++++++++++++
 .../hadoop/hive/ql/parse/CalcitePlanner.java       |  14 +-
 .../sketches_materialized_view_percentile_disc.q   |  54 +++
 ...rewrite.q => sketches_rewrite_count_distinct.q} |   0
 ...ewrite.q => sketches_rewrite_percentile_disc.q} |   9 +-
 ...etches_materialized_view_percentile_disc.q.out} | 280 ++++++++--------
 .../llap/sketches_materialized_view_rollup2.q.out  |   8 +-
 .../llap/sketches_materialized_view_safety.q.out   |   2 +-
 ...q.out => sketches_rewrite_count_distinct.q.out} |   2 +-
 ....out => sketches_rewrite_percentile_disc.q.out} |  64 ++--
 14 files changed, 643 insertions(+), 369 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bd884a9..a00d907 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2492,12 +2492,19 @@ public class HiveConf extends Configuration {
     HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED("hive.optimize.bi.rewrite.countdistinct.enabled",
         true,
         "Enables to rewrite COUNT(DISTINCT(X)) queries to be rewritten to use sketch functions."),
-
     HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH(
         "hive.optimize.bi.rewrite.countdistinct.sketch", "hll",
         new StringSet("hll"),
         "Defines which sketch type to use when rewriting COUNT(DISTINCT(X)) expressions. "
             + "Distinct counting can be done with: hll"),
+    HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_ENABLED("hive.optimize.bi.rewrite.percentile_disc.enabled",
+        true,
+        "Enables to rewrite PERCENTILE_DISC(X) queries to be rewritten to use sketch functions."),
+    HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_SKETCH(
+        "hive.optimize.bi.rewrite.percentile_disc.sketch", "kll",
+        new StringSet("kll"),
+        "Defines which sketch type to use when rewriting PERCENTILE_DISC expressions. Options: kll"),
+
 
     // Statistics
     HIVE_STATS_ESTIMATE_STATS("hive.stats.estimate", true,
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e7c3e43..0d06d02 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -872,9 +872,11 @@ minillaplocal.query.files=\
   schq_ingest.q,\
   sketches_hll.q,\
   sketches_theta.q,\
-  sketches_rewrite.q,\
+  sketches_rewrite_count_distinct.q,\
+  sketches_rewrite_percentile_disc.q,\
   sketches_materialized_view_rollup.q,\
   sketches_materialized_view_rollup2.q,\
+  sketches_materialized_view_percentile_disc.q,\
   sketches_materialized_view_safety.q,\
   table_access_keys_stats.q,\
   temp_table_llap_partitioned.q,\
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
index 8865380..cc48d5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
@@ -65,7 +65,7 @@ public final class DataSketchesFunctions implements HiveUDFPlugin {
   private static final String GET_CDF = "cdf";
   private static final String GET_PMF = "pmf";
   private static final String GET_QUANTILES = "quantiles";
-  private static final String GET_QUANTILE = "quantile";
+  public static final String GET_QUANTILE = "quantile";
   private static final String GET_RANK = "rank";
   private static final String INTERSECT_SKETCH = "intersect";
   private static final String INTERSECT_SKETCH1 = "intersect_f";
@@ -109,7 +109,8 @@ public final class DataSketchesFunctions implements HiveUDFPlugin {
     }
     SketchDescriptor sc = sketchClasses.get(className);
     if (!sc.fnMap.containsKey(function)) {
-      throw new IllegalArgumentException(String.format("The Sketch-class '%s' doesn't have a '%s' method", function));
+      throw new IllegalArgumentException(
+          String.format("The Sketch-class '%s' doesn't have a '%s' method", className, function));
     }
     return sketchClasses.get(className).fnMap.get(function);
   }
@@ -128,6 +129,7 @@ public final class DataSketchesFunctions implements HiveUDFPlugin {
       SketchFunctionDescriptor sketchSFD = sd.fnMap.get(DATA_TO_SKETCH);
       SketchFunctionDescriptor unionSFD = sd.fnMap.get(UNION_SKETCH);
       SketchFunctionDescriptor estimateSFD = sd.fnMap.get(SKETCH_TO_ESTIMATE);
+      SketchFunctionDescriptor quantileSFD = sd.fnMap.get(GET_QUANTILE);
 
       if (sketchSFD == null || unionSFD == null) {
         continue;
@@ -163,6 +165,20 @@ public final class DataSketchesFunctions implements HiveUDFPlugin {
 
         estimateSFD.setCalciteFunction(estimateFn);
       }
+
+      if (quantileSFD != null && quantileSFD.getReturnRelDataType().isPresent()) {
+        SqlFunction quantileFn = new HiveSqlFunction(quantileSFD.name,
+            SqlKind.OTHER_FUNCTION,
+            ReturnTypes.explicit(quantileSFD.getReturnRelDataType().get().getSqlTypeName()),
+            InferTypes.ANY_NULLABLE,
+            OperandTypes.family(),
+            SqlFunctionCategory.USER_DEFINED_FUNCTION,
+            true,
+            false);
+
+        quantileSFD.setCalciteFunction(quantileFn);
+
+      }
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
deleted file mode 100644
index c23e2c4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
-import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
-import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
-import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.
- *
- * The transformation here works on Aggregate nodes; the operations done are the following:
- *
- * 1. Identify candidate {@code count(distinct)} aggregate calls
- * 2. A new Aggregate is created in which the aggregation is done by the sketch function
- * 3. A new Project is inserted on top of the Aggregate; which unwraps the resulting
- *    count-distinct estimation from the sketch representation
- */
-public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule {
-
-  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class);
-  private final String sketchClass;
-  private final ProjectFactory projectFactory;
-
-  public HiveRewriteCountDistinctToDataSketches(String sketchClass) {
-    super(operand(HiveAggregate.class, any()));
-    this.sketchClass = sketchClass;
-    projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY;
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final Aggregate aggregate = call.rel(0);
-
-    if (aggregate.getGroupSets().size() != 1) {
-      // not yet supported
-      return;
-    }
-
-    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
-
-    VBuilder vb = new VBuilder(aggregate);
-
-    if (aggregate.getAggCallList().equals(vb.newAggCalls)) {
-      // rule didn't made any changes
-      return;
-    }
-
-    newAggCalls = vb.newAggCalls;
-    RelNode newAgg = aggregate.copy(aggregate.getTraitSet(), aggregate.getInput(), aggregate.getGroupSet(),
-        aggregate.getGroupSets(), newAggCalls);
-
-    RelNode newProject = projectFactory.createProject(newAgg, vb.newProjects, aggregate.getRowType().getFieldNames());
-
-    call.transformTo(newProject);
-    return;
-  }
-
-  /**
-   * Helper class to help in building a new Aggregate and Project.
-   */
-  // NOTE: methods in this class are not re-entrant; drop-to-frame to constructor during debugging
-  class VBuilder {
-
-    private Aggregate aggregate;
-    private List<AggregateCall> newAggCalls;
-    private List<RexNode> newProjects;
-    private final RexBuilder rexBuilder;
-
-    public VBuilder(Aggregate aggregate) {
-      this.aggregate = aggregate;
-      newAggCalls = new ArrayList<AggregateCall>();
-      newProjects = new ArrayList<RexNode>();
-      rexBuilder = aggregate.getCluster().getRexBuilder();
-
-      // add non-aggregated fields - as identity projections
-      addGroupFields();
-
-      for (AggregateCall aggCall : aggregate.getAggCallList()) {
-        processAggCall(aggCall);
-      }
-    }
-
-    private void addGroupFields() {
-      for (int i = 0; i < aggregate.getGroupCount(); i++) {
-        newProjects.add(rexBuilder.makeInputRef(aggregate, 0));
-      }
-    }
-
-    private void processAggCall(AggregateCall aggCall) {
-      if (isSimpleCountDistinct(aggCall)) {
-        rewriteCountDistinct(aggCall);
-        return;
-      }
-      appendAggCall(aggCall, null);
-    }
-
-    private void appendAggCall(AggregateCall aggCall, SqlOperator projectOperator) {
-      RelDataType origType = aggregate.getRowType().getFieldList().get(newProjects.size()).getType();
-      RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), newProjects.size());
-      if (projectOperator != null) {
-        projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex));
-        projRex = rexBuilder.makeCast(origType, projRex);
-      }
-      newAggCalls.add(aggCall);
-      newProjects.add(projRex);
-    }
-
-    private boolean isSimpleCountDistinct(AggregateCall aggCall) {
-      return aggCall.isDistinct() && aggCall.getArgList().size() == 1
-          && aggCall.getAggregation().getName().equalsIgnoreCase("count") && !aggCall.hasFilter();
-    }
-
-    private void rewriteCountDistinct(AggregateCall aggCall) {
-      SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
-      boolean distinct = false;
-      boolean approximate = true;
-      boolean ignoreNulls = aggCall.ignoreNulls();
-      List<Integer> argList = aggCall.getArgList();
-      int filterArg = aggCall.filterArg;
-      RelCollation collation = aggCall.getCollation();
-      int groupCount = aggregate.getGroupCount();
-      RelNode input = aggregate.getInput();
-      RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList());
-      String name = aggFunction.getName();
-
-      AggregateCall ret = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg,
-          collation, groupCount, input, type, name);
-
-      appendAggCall(ret, getSqlOperator(DataSketchesFunctions.SKETCH_TO_ESTIMATE));
-    }
-
-    private SqlOperator getSqlOperator(String fnName) {
-      UDFDescriptor fn = DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
-      if (!fn.getCalciteFunction().isPresent()) {
-        throw new RuntimeException(fn.toString() + " doesn't have a Calcite function associated with it");
-      }
-      return fn.getCalciteFunction().get();
-    }
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
new file mode 100644
index 0000000..0123137
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteToDataSketchesRules.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * This rule could rewrite aggregate calls to be calculated using sketch based functions.
+ *
+ * <br/>
+ * Currently it can rewrite:
+ * <ul>
+ *  <li>{@code count(distinct(x))} to distinct counting sketches
+ *    <pre>
+ *     SELECT COUNT(DISTINCT id) FROM sketch_input;
+ *       ⇒ SELECT ROUND(ds_hll_estimate(ds_hll_sketch(id))) FROM sketch_input;
+ *    </pre>
+ *  </li>
+ *  <li>{@code percentile_disc(0.2) within group (order by id)}
+ *    <pre>
+ *     SELECT PERCENTILE_DISC(0.2) WITHIN GROUP(ORDER BY ID) FROM sketch_input;
+ *       ⇒ SELECT ds_kll_quantile(ds_kll_sketch(CAST(id AS FLOAT)), 0.2) FROM sketch_input;
+ *    </pre>
+ *  </li>
+ *  </ul>
+ *
+ * <p>
+ *   The transformation here works on Aggregate nodes; the operations done are the following:
+ * </p>
+ * <ol>
+ * <li>Identify candidate aggregate calls</li>
+ * <li>A new Project is inserted below the Aggregate; to help with data pre-processing</li>
+ * <li>A new Aggregate is created in which the aggregation is done by the sketch function</li>
+ * <li>A new Project is inserted on top of the Aggregate; which unwraps the resulting
+ *    count-distinct estimation from the sketch representation</li>
+ * </ol>
+ */
+public final class HiveRewriteToDataSketchesRules {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteToDataSketchesRules.class);
+
+  /**
+   * Generic support for rewriting an Aggregate into a chain of Project->Aggregate->Project.
+   */
+  private static abstract class AggregateToProjectAggregateProject extends RelOptRule {
+
+    private final ProjectFactory projectFactory;
+
+    public AggregateToProjectAggregateProject(RelOptRuleOperand operand) {
+      super(operand);
+      projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      VbuilderPAP vb = processCall(call);
+      if (vb == null) {
+        return;
+      }
+
+      Aggregate aggregate = vb.aggregate;
+      if (aggregate.getAggCallList().equals(vb.newAggCalls)) {
+        // rule didn't make any changes
+        return;
+      }
+
+      List<AggregateCall> newAggCalls = vb.newAggCalls;
+      List<String> fieldNames = new ArrayList<String>();
+      for (int i = 0; i < vb.newProjectsBelow.size(); i++) {
+        fieldNames.add("ff_" + i);
+      }
+      RelNode newProjectBelow = projectFactory.createProject(aggregate.getInput(), vb.newProjectsBelow, fieldNames);
+
+      RelNode newAgg = aggregate.copy(aggregate.getTraitSet(), newProjectBelow, aggregate.getGroupSet(),
+          aggregate.getGroupSets(), newAggCalls);
+
+      RelNode newProject =
+          projectFactory.createProject(newAgg, vb.newProjectsAbove, aggregate.getRowType().getFieldNames());
+
+      call.transformTo(newProject);
+      return;
+
+    }
+
+    protected abstract VbuilderPAP processCall(RelOptRuleCall call);
+
+    private static abstract class VbuilderPAP {
+      protected final RexBuilder rexBuilder;
+
+      /** The original aggregate RelNode */
+      protected final Aggregate aggregate;
+      /** The list of the new aggregations */
+      protected final List<AggregateCall> newAggCalls;
+      /**
+       * The new projections expressions inserted above the aggregate
+       *
+       *  These projections should do the neccessary conversions to behave like the original aggregate.
+       *  Most important here is to CAST the final result to the same type as the original aggregate was producing.
+       */
+      protected final List<RexNode> newProjectsAbove;
+      /** The new projections expressions inserted belove the aggregate
+       *
+       * These projections could be used to prepocess the incoming datastream.
+       * For example a CAST might need to be injected.
+       */
+      protected final List<RexNode> newProjectsBelow;
+
+      private final String sketchClass;
+
+      protected VbuilderPAP(Aggregate aggregate, String sketchClass) {
+        this.aggregate = aggregate;
+        this.sketchClass = sketchClass;
+        newAggCalls = new ArrayList<AggregateCall>();
+        newProjectsAbove = new ArrayList<RexNode>();
+        newProjectsBelow = new ArrayList<RexNode>();
+        rexBuilder = aggregate.getCluster().getRexBuilder();
+      }
+
+      protected final void processAggregate() {
+        // add identity projections
+        addProjectedFields();
+
+        for (AggregateCall aggCall : aggregate.getAggCallList()) {
+          processAggCall(aggCall);
+        }
+      }
+
+      private final void addProjectedFields() {
+        for (int i = 0; i < aggregate.getGroupCount(); i++) {
+          newProjectsAbove.add(rexBuilder.makeInputRef(aggregate, i));
+        }
+        int numInputFields = aggregate.getInput().getRowType().getFieldCount();
+        for (int i = 0; i < numInputFields; i++) {
+          newProjectsBelow.add(rexBuilder.makeInputRef(aggregate.getInput(), i));
+        }
+      }
+
+      private final void processAggCall(AggregateCall aggCall) {
+        if (isApplicable(aggCall)) {
+          rewrite(aggCall);
+        } else {
+          appendAggCall(aggCall);
+        }
+      }
+
+      private final void appendAggCall(AggregateCall aggCall) {
+        RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), newProjectsAbove.size());
+
+        newAggCalls.add(aggCall);
+        newProjectsAbove.add(projRex);
+      }
+
+      protected final SqlOperator getSqlOperator(String fnName) {
+        UDFDescriptor fn = DataSketchesFunctions.INSTANCE.getSketchFunction(sketchClass, fnName);
+        if (!fn.getCalciteFunction().isPresent()) {
+          throw new RuntimeException(fn.toString() + " doesn't have a Calcite function associated with it");
+        }
+        return fn.getCalciteFunction().get();
+      }
+
+      abstract void rewrite(AggregateCall aggCall);
+
+      abstract boolean isApplicable(AggregateCall aggCall);
+
+    }
+
+  };
+
+  public static class CountDistinctRewrite extends AggregateToProjectAggregateProject {
+
+    private final String sketchType;
+
+    public CountDistinctRewrite(String sketchType) {
+      super(operand(HiveAggregate.class, any()));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    protected VBuilderPAP processCall(RelOptRuleCall call) {
+      final Aggregate aggregate = call.rel(0);
+
+      if (aggregate.getGroupSets().size() != 1) {
+        // not yet supported
+        return null;
+      }
+
+      return new VBuilderPAP(aggregate, sketchType);
+    }
+
+    private static class VBuilderPAP extends AggregateToProjectAggregateProject.VbuilderPAP {
+
+      protected VBuilderPAP(Aggregate aggregate, String sketchClass) {
+        super(aggregate, sketchClass);
+        processAggregate();
+      }
+
+      @Override
+      boolean isApplicable(AggregateCall aggCall) {
+        return aggCall.isDistinct() && aggCall.getArgList().size() == 1
+            && aggCall.getAggregation().getKind() == SqlKind.COUNT && !aggCall.hasFilter();
+      }
+
+      @Override
+      void rewrite(AggregateCall aggCall) {
+        RelDataType origType = aggregate.getRowType().getFieldList().get(newProjectsAbove.size()).getType();
+
+        Integer argIndex = aggCall.getArgList().get(0);
+        RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex);
+        newProjectsBelow.add(call);
+
+        SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = Lists.newArrayList(newProjectsBelow.size() - 1);
+        int filterArg = aggCall.filterArg;
+        RelCollation collation = aggCall.getCollation();
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList());
+        String name = aggFunction.getName();
+
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg,
+            collation, type, name);
+
+        SqlOperator projectOperator = getSqlOperator(DataSketchesFunctions.SKETCH_TO_ESTIMATE);
+        RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), newProjectsAbove.size());
+        projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex));
+        projRex = rexBuilder.makeCall(SqlStdOperatorTable.ROUND, ImmutableList.of(projRex));
+        projRex = rexBuilder.makeCast(origType, projRex);
+
+        newAggCalls.add(newAgg);
+        newProjectsAbove.add(projRex);
+      }
+    }
+  }
+
+  public static class PercentileDiscRewrite extends AggregateToProjectAggregateProject {
+
+    private final String sketchType;
+
+    public PercentileDiscRewrite(String sketchType) {
+      super(operand(HiveAggregate.class, operand(HiveProject.class, any())));
+      this.sketchType = sketchType;
+    }
+
+    @Override
+    protected VBuilderPAP processCall(RelOptRuleCall call) {
+      final Aggregate aggregate = call.rel(0);
+      final Project project = call.rel(1);
+
+      if (aggregate.getGroupSets().size() != 1) {
+        // not yet supported
+        return null;
+      }
+
+      return new VBuilderPAP(aggregate, project, sketchType);
+    }
+
+    private static class VBuilderPAP extends AggregateToProjectAggregateProject.VbuilderPAP {
+
+      private final Project aggInput;
+
+      protected VBuilderPAP(Aggregate aggregate, Project project, String sketchClass) {
+        super(aggregate, sketchClass);
+        aggInput = project;
+        processAggregate();
+      }
+
+      @Override
+      boolean isApplicable(AggregateCall aggCall) {
+        if ((aggInput instanceof Project)
+            && !aggCall.isDistinct() && aggCall.getArgList().size() == 4
+            && aggCall.getAggregation().getName().equalsIgnoreCase("percentile_disc")
+            && !aggCall.hasFilter()) {
+          List<Integer> argList = aggCall.getArgList();
+          RexNode orderLiteral = aggInput.getChildExps().get(argList.get(2));
+          if (orderLiteral.isA(SqlKind.LITERAL)) {
+            RexLiteral lit = (RexLiteral) orderLiteral;
+            return BigDecimal.valueOf(1).equals(lit.getValue());
+          }
+        }
+        return false;
+      }
+
+      @Override
+      void rewrite(AggregateCall aggCall) {
+        RelDataType origType = aggregate.getRowType().getFieldList().get(newProjectsAbove.size()).getType();
+
+        Integer argIndex = aggCall.getArgList().get(1);
+        RexNode call = rexBuilder.makeInputRef(aggregate.getInput(), argIndex);
+
+        RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory();
+        RelDataType notNullFloatType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+        RelDataType floatType = typeFactory.createTypeWithNullability(notNullFloatType, true);
+
+        call = rexBuilder.makeCast(floatType, call);
+        newProjectsBelow.add(call);
+
+        SqlAggFunction aggFunction = (SqlAggFunction) getSqlOperator(DataSketchesFunctions.DATA_TO_SKETCH);
+        boolean distinct = false;
+        boolean approximate = true;
+        boolean ignoreNulls = true;
+        List<Integer> argList = Lists.newArrayList(newProjectsBelow.size() - 1);
+        int filterArg = aggCall.filterArg;
+        RelCollation collation = aggCall.getCollation();
+        RelDataType type = rexBuilder.deriveReturnType(aggFunction, Collections.emptyList());
+        String name = aggFunction.getName();
+
+        AggregateCall newAgg = AggregateCall.create(aggFunction, distinct, approximate, ignoreNulls, argList, filterArg,
+            collation, type, name);
+
+        Integer origFractionIdx = aggCall.getArgList().get(0);
+        RexNode fraction = aggInput.getChildExps().get(origFractionIdx);
+        fraction = rexBuilder.makeCast(floatType, fraction);
+
+        SqlOperator projectOperator = getSqlOperator(DataSketchesFunctions.GET_QUANTILE);
+        RexNode projRex = rexBuilder.makeInputRef(newAgg.getType(), newProjectsAbove.size());
+        projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex, fraction));
+        projRex = rexBuilder.makeCast(origType, projRex);
+
+        newAggCalls.add(newAgg);
+        newProjectsAbove.add(projRex);
+      }
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 32ad4c1..377e828 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -92,7 +92,6 @@ import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.rules.FilterMergeRule;
 import org.apache.calcite.rel.rules.JoinToMultiJoinRule;
 import org.apache.calcite.rel.rules.LoptOptimizeJoinRule;
 import org.apache.calcite.rel.rules.ProjectMergeRule;
@@ -238,7 +237,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelDecorrelator;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRemoveGBYSemiJoinRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRemoveSqCountCheck;
-import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRewriteCountDistinctToDataSketches;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRewriteToDataSketchesRules;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortJoinReduceRule;
@@ -1975,9 +1974,14 @@ public class CalcitePlanner extends SemanticAnalyzer {
       if (!isMaterializedViewMaintenance() && conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_ENABLED)) {
         // Rewrite to datasketches if enabled
         if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNTDISTINCT_ENABLED)) {
-          String sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH);
-          generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN,
-              new HiveRewriteCountDistinctToDataSketches(sketchClass));
+          String countDistinctSketchType = conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_COUNT_DISTINCT_SKETCH);
+          RelOptRule rule = new HiveRewriteToDataSketchesRules.CountDistinctRewrite(countDistinctSketchType);
+          generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, rule);
+        }
+        if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_ENABLED)) {
+          String percentileDiscSketchType = conf.getVar(ConfVars.HIVE_OPTIMIZE_BI_REWRITE_PERCENTILE_DISC_SKETCH);
+          RelOptRule rule = new HiveRewriteToDataSketchesRules.PercentileDiscRewrite(percentileDiscSketchType);
+          generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN, rule);
         }
       }
       // Run this optimization early, since it is expanding the operator pipeline.
diff --git a/ql/src/test/queries/clientpositive/sketches_materialized_view_percentile_disc.q b/ql/src/test/queries/clientpositive/sketches_materialized_view_percentile_disc.q
new file mode 100644
index 0000000..4578f4f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/sketches_materialized_view_percentile_disc.q
@@ -0,0 +1,54 @@
+--! qt:transactional
+set hive.fetch.task.conversion=none;
+
+create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true');
+
+insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+; 
+
+-- create an mv for the intermediate results
+create  materialized view mv_1 as
+  select category, ds_kll_sketch(cast(id as float)) from sketch_input group by category;
+
+-- bi mode on
+set hive.optimize.bi.enabled=true;
+
+explain
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+
+set hive.optimize.bi.enabled=false;
+
+explain
+select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+
+set hive.optimize.bi.enabled=true;
+
+insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+;
+
+explain
+select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+
+explain
+alter materialized view mv_1 rebuild;
+alter materialized view mv_1 rebuild;
+
+explain
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category;
+
+-- rewrite+mv matching with rollup
+explain
+select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input;
+select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input;
+
+drop materialized view mv_1;
diff --git a/ql/src/test/queries/clientpositive/sketches_rewrite.q b/ql/src/test/queries/clientpositive/sketches_rewrite_count_distinct.q
similarity index 100%
copy from ql/src/test/queries/clientpositive/sketches_rewrite.q
copy to ql/src/test/queries/clientpositive/sketches_rewrite_count_distinct.q
diff --git a/ql/src/test/queries/clientpositive/sketches_rewrite.q b/ql/src/test/queries/clientpositive/sketches_rewrite_percentile_disc.q
similarity index 68%
rename from ql/src/test/queries/clientpositive/sketches_rewrite.q
rename to ql/src/test/queries/clientpositive/sketches_rewrite_percentile_disc.q
index 0420d62..aade878 100644
--- a/ql/src/test/queries/clientpositive/sketches_rewrite.q
+++ b/ql/src/test/queries/clientpositive/sketches_rewrite_percentile_disc.q
@@ -1,6 +1,5 @@
 --! qt:transactional
 
-set hive.optimize.bi.enabled=true;
 
 create table sketch_input (id int, category char(1))
 STORED AS ORC
@@ -11,9 +10,13 @@ insert into table sketch_input values
   (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
 ; 
 
+select percentile_disc(0.3) within group(order by id) from sketch_input;
+
+set hive.optimize.bi.enabled=true;
+
 -- see if rewrite happens
 explain
-select category, count(distinct id) from sketch_input group by category;
+select percentile_disc(0.3) within group(order by id) from sketch_input;
 
-select category, count(distinct id) from sketch_input group by category;
+select percentile_disc(0.3) within group(order by id) from sketch_input;
 
diff --git a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_percentile_disc.q.out
similarity index 66%
copy from ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out
copy to ql/src/test/results/clientpositive/llap/sketches_materialized_view_percentile_disc.q.out
index e7b3c0e..1752199 100644
--- a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out
+++ b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_percentile_disc.q.out
@@ -25,25 +25,25 @@ POSTHOOK: Output: default@sketch_input
 POSTHOOK: Lineage: sketch_input.category SCRIPT []
 POSTHOOK: Lineage: sketch_input.id SCRIPT []
 PREHOOK: query: create  materialized view mv_1 as
-  select category, ds_hll_sketch(id),count(id) from sketch_input group by category
+  select category, ds_kll_sketch(cast(id as float)) from sketch_input group by category
 PREHOOK: type: CREATE_MATERIALIZED_VIEW
 PREHOOK: Input: default@sketch_input
 PREHOOK: Output: database:default
 PREHOOK: Output: default@mv_1
 POSTHOOK: query: create  materialized view mv_1 as
-  select category, ds_hll_sketch(id),count(id) from sketch_input group by category
+  select category, ds_kll_sketch(cast(id as float)) from sketch_input group by category
 POSTHOOK: type: CREATE_MATERIALIZED_VIEW
 POSTHOOK: Input: default@sketch_input
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@mv_1
 PREHOOK: query: explain
-select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
@@ -61,9 +61,9 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: default.mv_1
-                  Statistics: Num rows: 2 Data size: 362 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 2 Data size: 410 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(ds_hll_estimate(_c1)) (type: bigint)
+                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToDouble(ds_kll_quantile(_c1, 0.2)) (type: double)
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 2 Data size: 394 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
@@ -82,25 +82,25 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+PREHOOK: query: select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+POSTHOOK: query: select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-rewrite; mv matching	a	10
-rewrite; mv matching	b	10
+rewrite; mv matching	a	2.0
+rewrite; mv matching	b	7.0
 PREHOOK: query: explain
-select 'no rewrite; no mv usage', category, count(distinct id) from sketch_input group by category
+select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select 'no rewrite; no mv usage', category, count(distinct id) from sketch_input group by category
+select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
@@ -122,52 +122,45 @@ STAGE PLANS:
                   alias: sketch_input
                   Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: id (type: int), category (type: char(1))
-                    outputColumnNames: id, category
+                    expressions: category (type: char(1)), id (type: int)
+                    outputColumnNames: _col0, _col2
                     Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      keys: category (type: char(1)), id (type: int)
-                      minReductionHashAggr: 0.3181818
+                      aggregations: percentile_disc(0.2, _col2, 1, 0)
+                      keys: _col0 (type: char(1))
+                      minReductionHashAggr: 0.9090909
                       mode: hash
                       outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 11 Data size: 979 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 2 Data size: 1746 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: char(1)), _col1 (type: int)
-                        null sort order: zz
-                        sort order: ++
+                        key expressions: _col0 (type: char(1))
+                        null sort order: z
+                        sort order: +
                         Map-reduce partition columns: _col0 (type: char(1))
-                        Statistics: Num rows: 11 Data size: 979 Basic stats: COMPLETE Column stats: COMPLETE
-            Execution mode: vectorized, llap
+                        Statistics: Num rows: 2 Data size: 1746 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: struct<counts:map<bigint,bigint>,percentiles:array<double>,isascending:boolean>)
+            Execution mode: llap
             LLAP IO: may be used (ACID table)
         Reducer 2 
-            Execution mode: vectorized, llap
+            Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                keys: KEY._col0 (type: char(1)), KEY._col1 (type: int)
+                aggregations: percentile_disc(VALUE._col0)
+                keys: KEY._col0 (type: char(1))
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 11 Data size: 979 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: _col1 (type: int), _col0 (type: char(1))
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 11 Data size: 979 Basic stats: COMPLETE Column stats: COMPLETE
-                  Group By Operator
-                    aggregations: count(_col0)
-                    keys: _col1 (type: char(1))
-                    mode: complete
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: 'no rewrite; no mv usage' (type: string), _col0 (type: char(1)), _col1 (type: bigint)
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 2 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 2 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE
-                        table:
-                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  expressions: 'no rewrite; no mv usage' (type: string), _col0 (type: char(1)), _col1 (type: double)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 2 Data size: 400 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -175,16 +168,16 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select 'no rewrite; no mv usage', category, count(distinct id) from sketch_input group by category
+PREHOOK: query: select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select 'no rewrite; no mv usage', category, count(distinct id) from sketch_input group by category
+POSTHOOK: query: select 'no rewrite; no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-no rewrite; no mv usage	a	10
-no rewrite; no mv usage	b	10
+no rewrite; no mv usage	a	2.0
+no rewrite; no mv usage	b	7.0
 PREHOOK: query: insert into table sketch_input values
   (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
   (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
@@ -200,12 +193,12 @@ POSTHOOK: Output: default@sketch_input
 POSTHOOK: Lineage: sketch_input.category SCRIPT []
 POSTHOOK: Lineage: sketch_input.id SCRIPT []
 PREHOOK: query: explain
-select 'rewrite; but no mv usage', category, count(distinct id) from sketch_input group by category
+select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select 'rewrite; but no mv usage', category, count(distinct id) from sketch_input group by category
+select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
@@ -227,36 +220,36 @@ STAGE PLANS:
                   alias: sketch_input
                   Statistics: Num rows: 44 Data size: 3916 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: id (type: int), category (type: char(1))
-                    outputColumnNames: id, category
+                    expressions: category (type: char(1)), UDFToFloat(id) (type: float)
+                    outputColumnNames: _col0, _col1
                     Statistics: Num rows: 44 Data size: 3916 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: ds_hll_sketch(id)
-                      keys: category (type: char(1))
+                      aggregations: ds_kll_sketch(_col1)
+                      keys: _col0 (type: char(1))
                       minReductionHashAggr: 0.95454544
                       mode: hash
                       outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
                         key expressions: _col0 (type: char(1))
                         null sort order: z
                         sort order: +
                         Map-reduce partition columns: _col0 (type: char(1))
-                        Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>)
+                        Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: binary)
             Execution mode: llap
             LLAP IO: may be used (ACID table)
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: ds_hll_sketch(VALUE._col0)
+                aggregations: ds_kll_sketch(VALUE._col0)
                 keys: KEY._col0 (type: char(1))
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: 'rewrite; but no mv usage' (type: string), _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
+                  expressions: 'rewrite; but no mv usage' (type: string), _col0 (type: char(1)), UDFToDouble(ds_kll_quantile(_col1, 0.2)) (type: double)
                   outputColumnNames: _col0, _col1, _col2
                   Statistics: Num rows: 2 Data size: 402 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
@@ -273,16 +266,16 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select 'rewrite; but no mv usage', category, count(distinct id) from sketch_input group by category
+PREHOOK: query: select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select 'rewrite; but no mv usage', category, count(distinct id) from sketch_input group by category
+POSTHOOK: query: select 'rewrite; but no mv usage', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-rewrite; but no mv usage	a	10
-rewrite; but no mv usage	b	10
+rewrite; but no mv usage	a	2.0
+rewrite; but no mv usage	b	7.0
 PREHOOK: query: explain
 alter materialized view mv_1 rebuild
 PREHOOK: type: QUERY
@@ -323,120 +316,116 @@ STAGE PLANS:
                     predicate: (ROW__ID.writeid > 1L) (type: boolean)
                     Statistics: Num rows: 14 Data size: 1246 Basic stats: COMPLETE Column stats: COMPLETE
                     Select Operator
-                      expressions: id (type: int), category (type: char(1))
-                      outputColumnNames: id, category
+                      expressions: category (type: char(1)), UDFToFloat(id) (type: float)
+                      outputColumnNames: _col0, _col1
                       Statistics: Num rows: 14 Data size: 1246 Basic stats: COMPLETE Column stats: COMPLETE
                       Group By Operator
-                        aggregations: ds_hll_sketch(id), count(id)
-                        keys: category (type: char(1))
+                        aggregations: ds_kll_sketch(_col1)
+                        keys: _col0 (type: char(1))
                         minReductionHashAggr: 0.85714287
                         mode: hash
-                        outputColumnNames: _col0, _col1, _col2
-                        Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
+                        outputColumnNames: _col0, _col1
+                        Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                         Reduce Output Operator
                           key expressions: _col0 (type: char(1))
                           null sort order: z
                           sort order: +
                           Map-reduce partition columns: _col0 (type: char(1))
-                          Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
-                          value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>), _col2 (type: bigint)
+                          Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                          value expressions: _col1 (type: binary)
             Execution mode: llap
             LLAP IO: may be used (ACID table)
         Map 6 
             Map Operator Tree:
                 TableScan
                   alias: default.mv_1
-                  Statistics: Num rows: 2 Data size: 378 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 2 Data size: 410 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: category (type: char(1)), _c1 (type: binary), _c2 (type: bigint)
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 2 Data size: 378 Basic stats: COMPLETE Column stats: COMPLETE
+                    expressions: category (type: char(1)), _c1 (type: binary)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 2 Data size: 410 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: ds_hll_union(_col1), sum(_col2)
+                      aggregations: ds_kll_union(_col1)
                       keys: _col0 (type: char(1))
                       minReductionHashAggr: 0.5
                       mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
                         key expressions: _col0 (type: char(1))
                         null sort order: z
                         sort order: +
                         Map-reduce partition columns: _col0 (type: char(1))
-                        Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>), _col2 (type: bigint)
+                        Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: binary)
             Execution mode: llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: ds_hll_sketch(VALUE._col0), count(VALUE._col1)
+                aggregations: ds_kll_sketch(VALUE._col0)
                 keys: KEY._col0 (type: char(1))
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 2 Data size: 474 Basic stats: COMPLETE Column stats: COMPLETE
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                 Group By Operator
-                  aggregations: ds_hll_union(_col1), sum(_col2)
+                  aggregations: ds_kll_union(_col1)
                   keys: _col0 (type: char(1))
                   minReductionHashAggr: 0.5
                   mode: hash
-                  outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                   Reduce Output Operator
                     key expressions: _col0 (type: char(1))
                     null sort order: z
                     sort order: +
                     Map-reduce partition columns: _col0 (type: char(1))
-                    Statistics: Num rows: 2 Data size: 962 Basic stats: COMPLETE Column stats: COMPLETE
-                    value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>), _col2 (type: bigint)
+                    Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                    value expressions: _col1 (type: binary)
         Reducer 4 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: ds_hll_union(VALUE._col0), sum(VALUE._col1)
+                aggregations: ds_kll_union(VALUE._col0)
                 keys: KEY._col0 (type: char(1))
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 2 Data size: 474 Basic stats: COMPLETE Column stats: COMPLETE
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                      serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                      name: default.mv_1
                 Select Operator
-                  expressions: _col0 (type: char(1)), _col1 (type: binary), COALESCE(_col2,0L) (type: bigint)
-                  outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 2 Data size: 474 Basic stats: COMPLETE Column stats: COMPLETE
-                  File Output Operator
-                    compressed: false
-                    Statistics: Num rows: 2 Data size: 474 Basic stats: COMPLETE Column stats: COMPLETE
-                    table:
-                        input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
-                        serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                        name: default.mv_1
-                  Select Operator
-                    expressions: _col0 (type: char(1)), _col1 (type: binary), _col2 (type: bigint)
-                    outputColumnNames: category, _c1, _c2
-                    Statistics: Num rows: 2 Data size: 474 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: compute_stats(category, 'hll'), compute_stats(_c1, 'hll'), compute_stats(_c2, 'hll')
-                      minReductionHashAggr: 0.5
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 1 Data size: 1152 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        null sort order: 
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 1152 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col0 (type: struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>), _col1 (type: struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint>), _col2 (type: struct<columntype:string,min:bigint,max:bigint,countnulls:bigint,bitvector:binary>)
+                  expressions: _col0 (type: char(1)), _col1 (type: binary)
+                  outputColumnNames: category, _c1
+                  Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                  Group By Operator
+                    aggregations: compute_stats(category, 'hll'), compute_stats(_c1, 'hll')
+                    minReductionHashAggr: 0.5
+                    mode: hash
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 1 Data size: 728 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      null sort order: 
+                      sort order: 
+                      Statistics: Num rows: 1 Data size: 728 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col0 (type: struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint,bitvector:binary>), _col1 (type: struct<columntype:string,maxlength:bigint,sumlength:bigint,count:bigint,countnulls:bigint>)
         Reducer 5 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: compute_stats(VALUE._col0), compute_stats(VALUE._col1), compute_stats(VALUE._col2)
+                aggregations: compute_stats(VALUE._col0), compute_stats(VALUE._col1)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 1 Data size: 1152 Basic stats: COMPLETE Column stats: COMPLETE
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 1 Data size: 712 Basic stats: COMPLETE Column stats: COMPLETE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 1 Data size: 1152 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 1 Data size: 712 Basic stats: COMPLETE Column stats: COMPLETE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -461,8 +450,8 @@ STAGE PLANS:
     Stats Work
       Basic Stats Work:
       Column Stats Desc:
-          Columns: category, _c1, _c2
-          Column Types: char(1), binary, bigint
+          Columns: category, _c1
+          Column Types: char(1), binary
           Table: default.mv_1
 
   Stage: Stage-4
@@ -481,16 +470,15 @@ POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
 POSTHOOK: Output: default@mv_1
 POSTHOOK: Lineage: mv_1._c1 EXPRESSION [(sketch_input)sketch_input.FieldSchema(name:id, type:int, comment:null), (mv_1)default.mv_1.FieldSchema(name:_c1, type:binary, comment:null), ]
-POSTHOOK: Lineage: mv_1._c2 EXPRESSION [(sketch_input)sketch_input.FieldSchema(name:id, type:int, comment:null), (mv_1)default.mv_1.FieldSchema(name:_c2, type:bigint, comment:null), ]
 POSTHOOK: Lineage: mv_1.category EXPRESSION [(sketch_input)sketch_input.FieldSchema(name:category, type:char(1), comment:null), (mv_1)default.mv_1.FieldSchema(name:category, type:char(1), comment:null), ]
 PREHOOK: query: explain
-select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
@@ -508,9 +496,9 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: default.mv_1
-                  Statistics: Num rows: 2 Data size: 362 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 2 Data size: 490 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(ds_hll_estimate(_c1)) (type: bigint)
+                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToDouble(ds_kll_quantile(_c1, 0.2)) (type: double)
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 2 Data size: 394 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
@@ -529,26 +517,26 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+PREHOOK: query: select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select 'rewrite; mv matching', category, count(distinct id) from sketch_input group by category
+POSTHOOK: query: select 'rewrite; mv matching', category, percentile_disc(0.2) within group (order by id) from sketch_input group by category
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-rewrite; mv matching	a	10
-rewrite; mv matching	b	10
+rewrite; mv matching	a	2.0
+rewrite; mv matching	b	7.0
 PREHOOK: query: explain
-select 'rewrite;mv matching with rollup',count(distinct id) from sketch_input
+select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select 'rewrite;mv matching with rollup',count(distinct id) from sketch_input
+select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
@@ -569,34 +557,34 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: default.mv_1
-                  Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 2 Data size: 320 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
                     expressions: _c1 (type: binary)
                     outputColumnNames: _c1
-                    Statistics: Num rows: 2 Data size: 192 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 2 Data size: 320 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: ds_hll_union(_c1)
+                      aggregations: ds_kll_union(_c1)
                       minReductionHashAggr: 0.5
                       mode: hash
                       outputColumnNames: _col0
-                      Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE
+                      Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
                         null sort order: 
                         sort order: 
-                        Statistics: Num rows: 1 Data size: 388 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col0 (type: struct<lgk:int,type:string,sketch:binary>)
+                        Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: binary)
             Execution mode: llap
             LLAP IO: all inputs
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: ds_hll_union(VALUE._col0)
+                aggregations: ds_kll_union(VALUE._col0)
                 mode: mergepartial
                 outputColumnNames: _col0
                 Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: 'rewrite;mv matching with rollup' (type: string), UDFToLong(ds_hll_estimate(_col0)) (type: bigint)
+                  expressions: 'rewrite;mv matching with rollup' (type: string), UDFToDouble(ds_kll_quantile(_col0, 0.2)) (type: double)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1 Data size: 123 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
@@ -613,17 +601,17 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select 'rewrite;mv matching with rollup',count(distinct id) from sketch_input
+PREHOOK: query: select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mv_1
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select 'rewrite;mv matching with rollup',count(distinct id) from sketch_input
+POSTHOOK: query: select 'rewrite;mv matching with rollup',percentile_disc(0.2) within group (order by id) from sketch_input
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@mv_1
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-rewrite;mv matching with rollup	15
+rewrite;mv matching with rollup	4.0
 PREHOOK: query: drop materialized view mv_1
 PREHOOK: type: DROP_MATERIALIZED_VIEW
 PREHOOK: Input: default@mv_1
diff --git a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out
index e7b3c0e..d9b72b0 100644
--- a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out
+++ b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_rollup2.q.out
@@ -63,7 +63,7 @@ STAGE PLANS:
                   alias: default.mv_1
                   Statistics: Num rows: 2 Data size: 362 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(ds_hll_estimate(_c1)) (type: bigint)
+                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(round(ds_hll_estimate(_c1))) (type: bigint)
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 2 Data size: 394 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
@@ -256,7 +256,7 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: 'rewrite; but no mv usage' (type: string), _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
+                  expressions: 'rewrite; but no mv usage' (type: string), _col0 (type: char(1)), UDFToLong(round(ds_hll_estimate(_col1))) (type: bigint)
                   outputColumnNames: _col0, _col1, _col2
                   Statistics: Num rows: 2 Data size: 402 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
@@ -510,7 +510,7 @@ STAGE PLANS:
                   alias: default.mv_1
                   Statistics: Num rows: 2 Data size: 362 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(ds_hll_estimate(_c1)) (type: bigint)
+                    expressions: 'rewrite; mv matching' (type: string), category (type: char(1)), UDFToLong(round(ds_hll_estimate(_c1))) (type: bigint)
                     outputColumnNames: _col0, _col1, _col2
                     Statistics: Num rows: 2 Data size: 394 Basic stats: COMPLETE Column stats: COMPLETE
                     File Output Operator
@@ -596,7 +596,7 @@ STAGE PLANS:
                 outputColumnNames: _col0
                 Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: 'rewrite;mv matching with rollup' (type: string), UDFToLong(ds_hll_estimate(_col0)) (type: bigint)
+                  expressions: 'rewrite;mv matching with rollup' (type: string), UDFToLong(round(ds_hll_estimate(_col0))) (type: bigint)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1 Data size: 123 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
diff --git a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_safety.q.out b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_safety.q.out
index 959edfc..126aee6 100644
--- a/ql/src/test/results/clientpositive/llap/sketches_materialized_view_safety.q.out
+++ b/ql/src/test/results/clientpositive/llap/sketches_materialized_view_safety.q.out
@@ -399,7 +399,7 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: 'rewritten;mv not used' (type: string), _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
+                  expressions: 'rewritten;mv not used' (type: string), _col0 (type: char(1)), UDFToLong(round(ds_hll_estimate(_col1))) (type: bigint)
                   outputColumnNames: _col0, _col1, _col2
                   Statistics: Num rows: 2 Data size: 396 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
diff --git a/ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out b/ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out
similarity index 99%
copy from ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
copy to ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out
index dedcff9..8c556ef 100644
--- a/ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
+++ b/ql/src/test/results/clientpositive/llap/sketches_rewrite_count_distinct.q.out
@@ -81,7 +81,7 @@ STAGE PLANS:
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
+                  expressions: _col0 (type: char(1)), UDFToLong(round(ds_hll_estimate(_col1))) (type: bigint)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
diff --git a/ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out b/ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_disc.q.out
similarity index 62%
rename from ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
rename to ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_disc.q.out
index dedcff9..d6c6911 100644
--- a/ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
+++ b/ql/src/test/results/clientpositive/llap/sketches_rewrite_percentile_disc.q.out
@@ -24,13 +24,22 @@ POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@sketch_input
 POSTHOOK: Lineage: sketch_input.category SCRIPT []
 POSTHOOK: Lineage: sketch_input.id SCRIPT []
+PREHOOK: query: select percentile_disc(0.3) within group(order by id) from sketch_input
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+POSTHOOK: query: select percentile_disc(0.3) within group(order by id) from sketch_input
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+6.0
 PREHOOK: query: explain
-select category, count(distinct id) from sketch_input group by category
+select percentile_disc(0.3) within group(order by id) from sketch_input
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
 POSTHOOK: query: explain
-select category, count(distinct id) from sketch_input group by category
+select percentile_disc(0.3) within group(order by id) from sketch_input
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
@@ -43,50 +52,46 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan
                   alias: sketch_input
-                  Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                  Statistics: Num rows: 22 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: id (type: int), category (type: char(1))
-                    outputColumnNames: id, category
-                    Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                    expressions: UDFToFloat(id) (type: float)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 22 Data size: 88 Basic stats: COMPLETE Column stats: COMPLETE
                     Group By Operator
-                      aggregations: ds_hll_sketch(id)
-                      keys: category (type: char(1))
-                      minReductionHashAggr: 0.9090909
+                      aggregations: ds_kll_sketch(_col0)
+                      minReductionHashAggr: 0.95454544
                       mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: char(1))
-                        null sort order: z
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: char(1))
-                        Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>)
+                        null sort order: 
+                        sort order: 
+                        Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: binary)
             Execution mode: llap
             LLAP IO: may be used (ACID table)
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
               Group By Operator
-                aggregations: ds_hll_sketch(VALUE._col0)
-                keys: KEY._col0 (type: char(1))
+                aggregations: ds_kll_sketch(VALUE._col0)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 144 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
-                  expressions: _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
+                  expressions: UDFToDouble(ds_kll_quantile(_col0, 0.3)) (type: double)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                   File Output Operator
                     compressed: false
-                    Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                     table:
                         input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                         output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -98,13 +103,12 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: select category, count(distinct id) from sketch_input group by category
+PREHOOK: query: select percentile_disc(0.3) within group(order by id) from sketch_input
 PREHOOK: type: QUERY
 PREHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-POSTHOOK: query: select category, count(distinct id) from sketch_input group by category
+POSTHOOK: query: select percentile_disc(0.3) within group(order by id) from sketch_input
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@sketch_input
 #### A masked pattern was here ####
-a	10
-b	10
+6.0