You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/04/24 04:58:50 UTC

[GitHub] [hive] jcamachor commented on a change in pull request #988: HIVE-23031 rewrite distinct

jcamachor commented on a change in pull request #988:
URL: https://github.com/apache/hive/pull/988#discussion_r414266001



##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -2465,6 +2465,12 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
         "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +
         "before executing the main query block. -1 will disable this feature."),
 
+    HIVE_OPTIMIZE_REWRITE_COUNTDISTINCT_ENABLED("hive.optimize.sketches.rewrite.countdistintct.enabled", false,

Review comment:
       Let's prefix all of them with `hive.optimize.bi`.
   
   Additionally, let's create a general toggle for all of them (`hive.optimize.bi.sketches.rewrite.enabled`?) that is `false` by default. Then individual ones such as `hive.optimize.bi.sketches.rewrite.countdistintct.enabled` are by default `true`.
   The idea is that users can enable the feature with a single change in their property values, and they disable selectively some of the transformations in case there are bugs, want to test anything else, etc.

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -2465,6 +2465,12 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
         "If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +
         "before executing the main query block. -1 will disable this feature."),
 
+    HIVE_OPTIMIZE_REWRITE_COUNTDISTINCT_ENABLED("hive.optimize.sketches.rewrite.countdistintct.enabled", false,
+        "Enables to rewrite COUNT(DISTINCT(X)) queries to be rewritten to use sketch functions."),
+
+    HIVE_OPTIMIZE_REWRITE_COUNT_DISTINCT_SKETCHCLASS("hive.optimize.sketches.rewrite.countdistintct.sketchclass", "hll",

Review comment:
       Let's limit the sketch classes options with `StringSet` with those that are valid.
   
   Additionally, can we add a comment in the description about what a 'sketch class' means?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -128,14 +141,26 @@ private void buildCalciteFns() {
           OperandTypes.family(),
           unionFn);
 
+
       unionSFD.setCalciteFunction(unionFn);
       sketchSFD.setCalciteFunction(sketchFn);
+      if (estimateSFD != null) {
+        SqlFunction estimateFn = new HiveSqlFunction(estimateSFD.name,
+            SqlKind.OTHER_FUNCTION,
+            ReturnTypes.explicit(SqlTypeName.DOUBLE),

Review comment:
       If this is a UDF, we should probably dynamically generate the return type from it as we do for other UDFs?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories.AggregateFactory;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.
+ */
+public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class);
+  private String sketchClass;
+
+  public HiveRewriteCountDistinctToDataSketches(HiveConf conf) {
+    super(operand(HiveAggregate.class, any()));
+    sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNT_DISTINCT_SKETCHCLASS);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+
+    if (aggregate.getGroupSets().size() != 1) {
+      // not yet supported
+      return;
+    }
+
+    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
+
+    AggregateFactory f = HiveRelFactories.HIVE_AGGREGATE_FACTORY;

Review comment:
       I guess you are not passing the builder because it would incur a penalty on every rule instantiation?
   That is perfect but maybe we can set these factories in the constructor so the rest remains generic? Additionally, we could pass the `sketchClass` enum directly instead of the full HiveConf.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/DataSketchesFunctions.java
##########
@@ -96,19 +97,31 @@ private DataSketchesFunctions() {
     return descriptors;
   }
 
+  public SketchFunctionDescriptor getSketchFunction(String className, String function) {

Review comment:
       Let's make className an `enum`, it will be neat.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories.AggregateFactory;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.
+ */
+public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class);
+  private String sketchClass;
+
+  public HiveRewriteCountDistinctToDataSketches(HiveConf conf) {
+    super(operand(HiveAggregate.class, any()));
+    sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNT_DISTINCT_SKETCHCLASS);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+
+    if (aggregate.getGroupSets().size() != 1) {
+      // not yet supported
+      return;
+    }
+
+    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
+
+    AggregateFactory f = HiveRelFactories.HIVE_AGGREGATE_FACTORY;

Review comment:
       It seems this factory is never used?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories.AggregateFactory;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.
+ */
+public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class);
+  private String sketchClass;
+
+  public HiveRewriteCountDistinctToDataSketches(HiveConf conf) {
+    super(operand(HiveAggregate.class, any()));
+    sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNT_DISTINCT_SKETCHCLASS);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+
+    if (aggregate.getGroupSets().size() != 1) {
+      // not yet supported
+      return;
+    }
+
+    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
+
+    AggregateFactory f = HiveRelFactories.HIVE_AGGREGATE_FACTORY;
+
+    VBuilder vb = new VBuilder(aggregate);
+
+    ProjectFactory projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY;
+
+    if (aggregate.getAggCallList().equals(vb.newAggCalls)) {
+      // rule didn't made any changes
+      return;
+    }
+
+    newAggCalls = vb.newAggCalls;
+    // FIXME HiveAggregate?
+    RelNode newAgg = aggregate.copy(aggregate.getTraitSet(), aggregate.getInput(), aggregate.getGroupSet(),
+        aggregate.getGroupSets(), newAggCalls);
+
+    RelNode newProject = projectFactory.createProject(newAgg, vb.newProjects, aggregate.getRowType().getFieldNames());
+
+    call.transformTo(newProject);
+    return;
+  }
+
+  /**
+   * Helper class to help in building a new Aggregate and Project.
+   */
+  // NOTE: methods in this class are not re-entrant; drop-to-frame to constructor during debugging
+  class VBuilder {
+
+    private Aggregate aggregate;
+    private List<AggregateCall> newAggCalls;
+    private List<RexNode> newProjects;
+    private final RexBuilder rexBuilder;
+
+    public VBuilder(Aggregate aggregate) {
+      this.aggregate = aggregate;
+      newAggCalls = new ArrayList<AggregateCall>();
+      newProjects = new ArrayList<RexNode>();
+      rexBuilder = aggregate.getCluster().getRexBuilder();
+
+      // add non-aggregated fields - as identity projections
+      addGroupFields();
+
+      for (AggregateCall aggCall : aggregate.getAggCallList()) {
+        processAggCall(aggCall);
+      }
+    }
+
+    private void addGroupFields() {
+      for (int i = 0; i < aggregate.getGroupCount(); i++) {
+        newProjects.add(rexBuilder.makeInputRef(aggregate, 0));
+      }
+    }
+
+    private void processAggCall(AggregateCall aggCall) {
+      if (isSimpleCountDistinct(aggCall)) {
+        rewriteCountDistinct(aggCall);
+        return;
+      }
+      appendAggCall(aggCall, null);
+    }
+
+    private void appendAggCall(AggregateCall aggCall, SqlOperator projectOperator) {
+      RelDataType origType = aggregate.getRowType().getFieldList().get(newProjects.size()).getType();
+      RexNode projRex = rexBuilder.makeInputRef(aggCall.getType(), newProjects.size());
+      if (projectOperator != null) {
+        projRex = rexBuilder.makeCall(projectOperator, ImmutableList.of(projRex));
+        projRex = rexBuilder.makeCast(origType, projRex);
+      }
+      newAggCalls.add(aggCall);
+      newProjects.add(projRex);
+    }
+
+    private boolean isSimpleCountDistinct(AggregateCall aggCall) {
+      return aggCall.isDistinct() && aggCall.getArgList().size() == 1
+          && aggCall.getAggregation().getName().equalsIgnoreCase("count") && !aggCall.hasFilter();
+    }
+
+    private void rewriteCountDistinct(AggregateCall aggCall) {
+

Review comment:
       nit. newline

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
##########
@@ -1967,6 +1968,13 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv
       generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST,
           HiveExceptRewriteRule.INSTANCE);
 
+      // ?

Review comment:
       We can add a comment here?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
##########
@@ -1967,6 +1968,13 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv
       generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST,
           HiveExceptRewriteRule.INSTANCE);
 
+      // ?
+      if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNTDISTINCT_ENABLED)) {
+        generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN,
+            new HiveRewriteCountDistinctToDataSketches(conf));
+      }
+
+

Review comment:
       nit. 2 newlines

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
##########
@@ -1967,6 +1968,13 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv
       generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST,
           HiveExceptRewriteRule.INSTANCE);
 
+      // ?
+      if (conf.getBoolVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNTDISTINCT_ENABLED)) {
+        generatePartialProgram(program, true, HepMatchOrder.TOP_DOWN,
+            new HiveRewriteCountDistinctToDataSketches(conf));

Review comment:
       As mentioned above, 1) let's use the general flag + specific one, and 2) let's not pass the full conf object.

##########
File path: ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
##########
@@ -0,0 +1,110 @@
+PREHOOK: query: create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@sketch_input
+POSTHOOK: query: create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@sketch_input
+PREHOOK: query: insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@sketch_input
+POSTHOOK: query: insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@sketch_input
+POSTHOOK: Lineage: sketch_input.category SCRIPT []
+POSTHOOK: Lineage: sketch_input.id SCRIPT []
+PREHOOK: query: explain
+select category, count(distinct id) from sketch_input group by category
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+POSTHOOK: query: explain
+select category, count(distinct id) from sketch_input group by category
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: sketch_input
+                  Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: id (type: int), category (type: char(1))
+                    outputColumnNames: id, category
+                    Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: ds_hll_sketch(id)
+                      keys: category (type: char(1))
+                      minReductionHashAggr: 0.9090909
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: char(1))
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: char(1))
+                        Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: ds_hll_sketch(VALUE._col0)
+                keys: KEY._col0 (type: char(1))
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 2 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select category, count(distinct id) from sketch_input group by category
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+POSTHOOK: query: select category, count(distinct id) from sketch_input group by category
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+a	10
+b	10

Review comment:
       Let's add a test to `sketches_materialized_view_rollup.q` (or to a new file) with the new `bi` flag on and the following steps:
   1) We create a MV that contains a count distinct over a source table.
   2) Then we explain cbo/execute a query that contains count distinct and should be rewritten to use the MV.
   3) Then we can disable `bi` acceleration and check that the query is not rewritten to use the MV (we enable it again after that).
   4) Then insert new data into the source table.
   5) Trigger a MV rebuild (we need explain to verify that it is incremental).
   6) Trigger the query in 2) again and we should hit the MV again.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories.AggregateFactory;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.

Review comment:
       Can we describe the source expr and target expr in the rewriting here? These are complex rewritings with several function calls so it is good to show exactly the transformation that is being executed.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRewriteCountDistinctToDataSketches.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories.AggregateFactory;
+import org.apache.calcite.rel.core.RelFactories.ProjectFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.DataSketchesFunctions;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
+import org.apache.hive.plugin.api.HiveUDFPlugin.UDFDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * This rule could rewrite {@code count(distinct(x))} calls to be calculated using sketch based functions.
+ */
+public final class HiveRewriteCountDistinctToDataSketches extends RelOptRule {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveRewriteCountDistinctToDataSketches.class);
+  private String sketchClass;
+
+  public HiveRewriteCountDistinctToDataSketches(HiveConf conf) {
+    super(operand(HiveAggregate.class, any()));
+    sketchClass = conf.getVar(ConfVars.HIVE_OPTIMIZE_REWRITE_COUNT_DISTINCT_SKETCHCLASS);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Aggregate aggregate = call.rel(0);
+
+    if (aggregate.getGroupSets().size() != 1) {
+      // not yet supported
+      return;
+    }
+
+    List<AggregateCall> newAggCalls = new ArrayList<AggregateCall>();
+
+    AggregateFactory f = HiveRelFactories.HIVE_AGGREGATE_FACTORY;
+
+    VBuilder vb = new VBuilder(aggregate);
+
+    ProjectFactory projectFactory = HiveRelFactories.HIVE_PROJECT_FACTORY;
+
+    if (aggregate.getAggCallList().equals(vb.newAggCalls)) {
+      // rule didn't made any changes
+      return;
+    }
+
+    newAggCalls = vb.newAggCalls;
+    // FIXME HiveAggregate?

Review comment:
       ?

##########
File path: ql/src/test/results/clientpositive/llap/sketches_rewrite.q.out
##########
@@ -0,0 +1,110 @@
+PREHOOK: query: create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@sketch_input
+POSTHOOK: query: create table sketch_input (id int, category char(1))
+STORED AS ORC
+TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@sketch_input
+PREHOOK: query: insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@sketch_input
+POSTHOOK: query: insert into table sketch_input values
+  (1,'a'),(1, 'a'), (2, 'a'), (3, 'a'), (4, 'a'), (5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a'), (10, 'a'),
+  (6,'b'),(6, 'b'), (7, 'b'), (8, 'b'), (9, 'b'), (10, 'b'), (11, 'b'), (12, 'b'), (13, 'b'), (14, 'b'), (15, 'b')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@sketch_input
+POSTHOOK: Lineage: sketch_input.category SCRIPT []
+POSTHOOK: Lineage: sketch_input.id SCRIPT []
+PREHOOK: query: explain
+select category, count(distinct id) from sketch_input group by category
+PREHOOK: type: QUERY
+PREHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+POSTHOOK: query: explain
+select category, count(distinct id) from sketch_input group by category
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@sketch_input
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: sketch_input
+                  Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: id (type: int), category (type: char(1))
+                    outputColumnNames: id, category
+                    Statistics: Num rows: 22 Data size: 1958 Basic stats: COMPLETE Column stats: COMPLETE
+                    Group By Operator
+                      aggregations: ds_hll_sketch(id)
+                      keys: category (type: char(1))
+                      minReductionHashAggr: 0.9090909
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: char(1))
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: char(1))
+                        Statistics: Num rows: 2 Data size: 946 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: struct<lgk:int,type:string,sketch:binary>)
+            Execution mode: llap
+            LLAP IO: may be used (ACID table)
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: ds_hll_sketch(VALUE._col0)
+                keys: KEY._col0 (type: char(1))
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 458 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col0 (type: char(1)), UDFToLong(ds_hll_estimate(_col1)) (type: bigint)

Review comment:
       nice :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org