You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/07/07 21:06:38 UTC

[impala] 02/03: IMPALA-9632: Implement ds_hll_sketch() and ds_hll_estimate()

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7e456dfa9d932bcdb317ad6477abc3c399abacf2
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Tue Apr 14 11:45:21 2020 +0200

    IMPALA-9632: Implement ds_hll_sketch() and ds_hll_estimate()
    
    These functions can be used to get cardinality estimates of data
    using HLL algorithm from Apache DataSketches. ds_hll_sketch()
    receives a dataset, e.g. a column from a table, and returns a
    serialized HLL sketch in string format. This can be written to a
    table or be fed directly to ds_hll_estimate() that returns the
    cardinality estimate for that sketch.
    
    Comparing to ndv() these functions bring more flexibility as once we
    fed data to the sketch it can be written to a table and next time we
    can save scanning through the dataset and simply return the estimate
    using the sketch. This doesn't come for free, however, as perfomance
    measurements show that ndv() is 2x-3.5x faster than sketching. On the
    other hand if we query the estimate from an existing sketch then the
    runtime is negligible.
    Another flexibility with these sketches is that they can be merged
    together so e.g. if we had saved a sketch for each of the partitions
    of a table then they can be combined with each other based on the
    query without touching the actual data.
    DataSketches HLL is sensitive for the order of the data fed to the
    sketch and as a result running these algorithms in Impala gets
    non-deterministic results within the error bounds of the algorithm.
    In terms of correctness DataSketches HLL is most of the time in 2%
    range from the correct result but there are occasional spikes where
    the difference is bigger but never goes out of the range of 5%.
    Even though the DataSketches HLL algorithm could be parameterized
    currently this implementation hard-codes these parameters and use
    HLL_4 and lg_k=12.
    
    For more details about Apache DataSketches' HLL implementation see:
    https://datasketches.apache.org/docs/HLL/HLL.html
    
    Testing:
     - Added some tests running estimates for small datasets where the
       amount of data is small enough to get the correct results.
     - Ran manual tests on TPCH25.lineitem to compare perfomance with
       ndv(). Depending on data characteristics ndv() appears 2x-3.5x
       faster. The lower the cardinality of the dataset the bigger the
       difference between the 2 algorithms is.
     - Ran manual tests on TPCH25.lineitem and
       functional_parquet.alltypes to compare correctness with ndv(). See
       results above.
    
    Change-Id: Ic602cb6eb2bfbeab37e5e4cba11fbf0ca40b03fe
    Reviewed-on: http://gerrit.cloudera.org:8080/16000
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/codegen/impala-ir.cc                        |   1 +
 be/src/exprs/CMakeLists.txt                        |   1 +
 be/src/exprs/aggregate-functions-ir.cc             | 128 ++++++++++++++++
 be/src/exprs/aggregate-functions.h                 |  11 ++
 be/src/exprs/datasketches-functions-ir.cc          |  42 ++++++
 be/src/exprs/datasketches-functions.h              |  34 +++++
 be/src/exprs/datasketches-test.cc                  |  14 +-
 be/src/exprs/scalar-expr-evaluator.cc              |   2 +
 common/function-registry/impala_functions.py       |   3 +
 .../apache/impala/analysis/FunctionCallExpr.java   |   4 +
 .../apache/impala/catalog/AggregateFunction.java   |  11 ++
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  43 ++++++
 .../java/org/apache/impala/catalog/Function.java   |   8 +
 testdata/data/README                               |   6 +
 testdata/data/hll_sketches_from_hive.parquet       | Bin 0 -> 2928 bytes
 .../queries/QueryTest/datasketches-hll.test        | 168 +++++++++++++++++++++
 tests/query_test/test_datasketches.py              |  37 +++++
 17 files changed, 505 insertions(+), 8 deletions(-)

diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 0feaf55..2fdc82f 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -43,6 +43,7 @@
 #include "exprs/cast-functions-ir.cc"
 #include "exprs/compound-predicates-ir.cc"
 #include "exprs/conditional-functions-ir.cc"
+#include "exprs/datasketches-functions-ir.cc"
 #include "exprs/date-functions-ir.cc"
 #include "exprs/decimal-functions-ir.cc"
 #include "exprs/decimal-operators-ir.cc"
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 92b1fc6..e0ed683 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -36,6 +36,7 @@ add_library(Exprs
   compound-predicates-ir.cc
   conditional-functions.cc
   conditional-functions-ir.cc
+  datasketches-functions-ir.cc
   date-functions-ir.cc
   decimal-functions-ir.cc
   decimal-operators-ir.cc
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 0285f4d..06395f40 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -38,6 +38,7 @@
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
+#include "thirdparty/datasketches/hll.hpp"
 #include "util/arithmetic-util.h"
 #include "util/mpfit-util.h"
 #include "util/pretty-printer.h"
@@ -1610,6 +1611,116 @@ BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal&
   return estimate;
 }
 
+/// Config for DataSketches HLL algorithm to set the size of each entry within the
+/// sketch.
+/// Introducing this variable in the .cc to avoid including the whole DataSketches HLL
+/// functionality into the header.
+const datasketches::target_hll_type DS_HLL_TYPE = datasketches::target_hll_type::HLL_4;
+
+/// Auxiliary function that receives a hll_sketch and returns the serialized version of
+/// it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
+/// functionality into the header.
+StringVal SerializeDsHllSketch(FunctionContext* ctx,
+    const datasketches::hll_sketch& sketch) {
+  std::stringstream serialized_sketch;
+  sketch.serialize_compact(serialized_sketch);
+  std::string serialized_sketch_str = serialized_sketch.str();
+  StringVal dst(ctx, serialized_sketch_str.size());
+  memcpy(dst.ptr, serialized_sketch_str.c_str(), serialized_sketch_str.size());
+  return dst;
+}
+
+void AggregateFunctions::DsHllInit(FunctionContext* ctx, StringVal* dst) {
+  AllocBuffer(ctx, dst, sizeof(datasketches::hll_sketch));
+  if (UNLIKELY(dst->is_null)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return;
+  }
+  // Note, that hll_sketch will always have the same size regardless of the amount of data
+  // it keeps track. This is because it's a wrapper class that holds all the inserted data
+  // on heap. Here, we put only the wrapper class into a StringVal.
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(dst->ptr);
+  *sketch_ptr = datasketches::hll_sketch(DS_SKETCH_CONFIG, DS_HLL_TYPE);
+}
+
+template <typename T>
+void AggregateFunctions::DsHllUpdate(FunctionContext* ctx, const T& src,
+    StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(dst->ptr);
+  sketch_ptr->update(src.val);
+}
+
+// Specialize for StringVal
+template <>
+void AggregateFunctions::DsHllUpdate(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(dst->ptr);
+  sketch_ptr->update(reinterpret_cast<char*>(src.ptr), src.len);
+}
+
+StringVal AggregateFunctions::DsHllSerialize(FunctionContext* ctx,
+    const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
+  StringVal dst = SerializeDsHllSketch(ctx, *sketch_ptr);
+  ctx->Free(src.ptr);
+  return dst;
+}
+
+void AggregateFunctions::DsHllMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  DCHECK(!src.is_null);
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch src_sketch =
+      datasketches::hll_sketch::deserialize((void*)src.ptr, src.len);
+
+  datasketches::hll_sketch* dst_sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(dst->ptr);
+
+  datasketches::hll_union union_sketch(DS_SKETCH_CONFIG);
+  union_sketch.update(src_sketch);
+  union_sketch.update(*dst_sketch_ptr);
+
+  *dst_sketch_ptr = union_sketch.get_result(DS_HLL_TYPE);
+}
+
+BigIntVal AggregateFunctions::DsHllFinalize(FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
+  BigIntVal estimate = sketch_ptr->get_estimate();
+  ctx->Free(src.ptr);
+  return (estimate == 0) ? BigIntVal::null() : estimate;
+}
+
+StringVal AggregateFunctions::DsHllFinalizeSketch(FunctionContext* ctx,
+    const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::hll_sketch));
+  datasketches::hll_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
+  StringVal result_str = StringVal::null();
+  if (sketch_ptr->get_estimate() > 0.0) {
+    result_str = SerializeDsHllSketch(ctx, *sketch_ptr);
+  }
+  ctx->Free(src.ptr);
+  return result_str;
+}
+
 /// Intermediate aggregation state for the SampledNdv() function.
 /// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>.
 /// The 'row_count' keeps track of how many input rows were aggregated into that
@@ -2547,6 +2658,23 @@ template void AggregateFunctions::HllUpdate(
 template void AggregateFunctions::HllUpdate(
     FunctionContext*, const DateVal&, const IntVal&, StringVal*);
 
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const BooleanVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const TinyIntVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const SmallIntVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const IntVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const BigIntVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const FloatVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const DoubleVal&, StringVal*);
+template void AggregateFunctions::DsHllUpdate(
+    FunctionContext*, const DateVal&, StringVal*);
+
 template void AggregateFunctions::SampledNdvUpdate(
     FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*);
 template void AggregateFunctions::SampledNdvUpdate(
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index b23525b..bc056b0 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -233,6 +233,17 @@ class AggregateFunctions {
   static uint64_t HllFinalEstimate(
       const uint8_t* buckets, int hll_len = AggregateFunctions::DEFAULT_HLL_LEN);
 
+  /// These functions provide cardinality estimates similarly to ndv() but these use HLL
+  /// algorithm from Apache Datasketches.
+  static constexpr int DS_SKETCH_CONFIG = 12; // Sketch can hold 2^DS_SKETCH_CONFIG rows
+  static void DsHllInit(FunctionContext*, StringVal* slot);
+  template <typename T>
+  static void DsHllUpdate(FunctionContext*, const T& src, StringVal* dst);
+  static StringVal DsHllSerialize(FunctionContext*, const StringVal& src);
+  static void DsHllMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+  static BigIntVal DsHllFinalize(FunctionContext*, const StringVal& src);
+  static StringVal DsHllFinalizeSketch(FunctionContext*, const StringVal& src);
+
   /// Estimates the number of distinct values (NDV) based on a sample of data and the
   /// corresponding sampling rate. The main idea of this function is to collect several
   /// (x,y) data points where x is the number of rows and y is the corresponding NDV
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
new file mode 100644
index 0000000..652c088
--- /dev/null
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "datasketches-functions.h"
+
+#include "runtime/runtime-state.h"
+#include "thirdparty/datasketches/hll.hpp"
+#include "udf/udf-internal.h"
+
+namespace impala {
+
+BigIntVal DataSketchesFunctions::DsHllEstimate(FunctionContext* ctx,
+    const StringVal& serialized_sketch) {
+  if (serialized_sketch.is_null || serialized_sketch.len == 0) return BigIntVal::null();
+  try {
+    datasketches::hll_sketch sketch =
+        datasketches::hll_sketch::deserialize((void*)serialized_sketch.ptr,
+            serialized_sketch.len);
+    return sketch.get_estimate();
+  } catch (const std::invalid_argument&) {
+    // Deserialization throws if the input string is not a serialized sketch.
+    ctx->SetError("Unable to deserialize sketch.");
+    return BigIntVal::null();
+  }
+}
+
+}
+
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
new file mode 100644
index 0000000..bcbec89
--- /dev/null
+++ b/be/src/exprs/datasketches-functions.h
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "udf/udf.h"
+
+namespace impala {
+
+using impala_udf::BigIntVal;
+using impala_udf::FunctionContext;
+using impala_udf::StringVal;
+
+class DataSketchesFunctions {
+public:
+  static BigIntVal DsHllEstimate(FunctionContext*, const StringVal&);
+};
+
+}
+
diff --git a/be/src/exprs/datasketches-test.cc b/be/src/exprs/datasketches-test.cc
index bafeb8a..afd2505 100644
--- a/be/src/exprs/datasketches-test.cc
+++ b/be/src/exprs/datasketches-test.cc
@@ -17,8 +17,7 @@
 
 #include "thirdparty/datasketches/hll.hpp"
 
-#include <fstream>
-#include <iostream>
+#include <sstream>
 
 #include "testutil/gtest-util.h"
 
@@ -62,12 +61,11 @@ TEST(TestDataSketchesHll, UseDataSketchesInterface) {
     union_sketch.update(sketch2);
     datasketches::hll_sketch sketch = union_sketch.get_result(type);
 
-    // Approximate result should be in the range of 1.5% to the accurate number. Picked
-    // this threshold to be on the safe side and to make sure that this test won't start
-    // failing once in a while.
-    int accurate_result = 150000;
-    int error_range = accurate_result * 0.015;
-    EXPECT_LE(std::abs(sketch.get_estimate() - accurate_result), error_range);
+    // These sketching algorithms are sensitive for the order of the inputs and may
+    // return different estimations withing the error bounds of the algorithm. However,
+    // the order of the inputs fed to the sketches is fix here so we get the same
+    // estimate every time we run this test.
+    EXPECT_EQ(152040, (int)sketch.get_estimate());
   }
 }
 
diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc
index 57a4098..14ec10e 100644
--- a/be/src/exprs/scalar-expr-evaluator.cc
+++ b/be/src/exprs/scalar-expr-evaluator.cc
@@ -28,6 +28,7 @@
 #include "exprs/cast-functions.h"
 #include "exprs/compound-predicates.h"
 #include "exprs/conditional-functions.h"
+#include "datasketches-functions.h"
 #include "exprs/date-functions.h"
 #include "exprs/decimal-functions.h"
 #include "exprs/decimal-operators.h"
@@ -431,6 +432,7 @@ void ScalarExprEvaluator::InitBuiltinsDummy() {
   CastFunctions::CastToBooleanVal(nullptr, TinyIntVal::null());
   CompoundPredicate::Not(nullptr, BooleanVal::null());
   ConditionalFunctions::NullIfZero(nullptr, TinyIntVal::null());
+  DataSketchesFunctions::DsHllEstimate(nullptr, StringVal::null());
   DecimalFunctions::Precision(nullptr, DecimalVal::null());
   DecimalOperators::CastToDecimalVal(nullptr, DecimalVal::null());
   InPredicate::InIterate(nullptr, BigIntVal::null(), 0, nullptr);
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index 2ea8597..18c86d0 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -929,6 +929,9 @@ visible_functions = [
   [['mask_hash'], 'BOOLEAN', ['BOOLEAN'], 'impala::MaskFunctions::MaskHash'],
   [['mask_hash'], 'TIMESTAMP', ['TIMESTAMP'], 'impala::MaskFunctions::MaskHash'],
   [['mask_hash'], 'DATE', ['DATE'], 'impala::MaskFunctions::MaskHash'],
+
+  # Functions to use Apache DataSketches functionality
+  [['ds_hll_estimate'], 'BIGINT', ['STRING'], '_ZN6impala21DataSketchesFunctions13DsHllEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
 ]
 
 invisible_functions = [
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index aa78a31..86af1c2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -633,6 +633,10 @@ public class FunctionCallExpr extends Expr {
 
       AggregateFunction aggFn = (AggregateFunction)fn_;
       if (aggFn.ignoresDistinct()) params_.setIsDistinct(false);
+
+      if (aggFn.isUnsupported()) {
+        throw new AnalysisException(getFunctionNotFoundError(argTypes));
+      }
     }
 
     if (params_.isIgnoreNulls() && !isAnalyticFnCall_) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
index 70d45b5..f4786b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
@@ -145,6 +145,17 @@ public class AggregateFunction extends Function {
     return fn;
   }
 
+  public static AggregateFunction createUnsupportedBuiltin(Db db, String name,
+      List<Type> argTypes, Type retType, Type intermediateType) {
+    AggregateFunction fn = new AggregateFunction(new FunctionName(db.getName(), name),
+        argTypes, retType, intermediateType, null, null, null, null, null, null, null,
+        null);
+    fn.setBinaryType(TFunctionBinaryType.BUILTIN);
+    fn.isAggregateFn_ = true;
+    fn.setUnsupported();
+    return fn;
+  }
+
   public static AggregateFunction createAnalyticBuiltin(Db db, String name,
       List<Type> argTypes, Type retType, Type intermediateType) {
     return createAnalyticBuiltin(db, name, argTypes, retType, intermediateType, null,
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 9fa41bb..36dc88c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -377,6 +377,22 @@ public class BuiltinsDb extends Db {
              "9HllUpdateIN10impala_udf7DateValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
         .build();
 
+    private static final Map<Type, String> DS_HLL_UPDATE_SYMBOL =
+      ImmutableMap.<Type, String>builder()
+        .put(Type.TINYINT,
+            "11DsHllUpdateIN10impala_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.INT,
+            "11DsHllUpdateIN10impala_udf6IntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.BIGINT,
+            "11DsHllUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.FLOAT,
+            "11DsHllUpdateIN10impala_udf8FloatValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.DOUBLE,
+            "11DsHllUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.STRING,
+            "11DsHllUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
+        .build();
+
   private static final Map<Type, String> SAMPLED_NDV_UPDATE_SYMBOL =
       ImmutableMap.<Type, String>builder()
         .put(Type.BOOLEAN,
@@ -1035,6 +1051,33 @@ public class BuiltinsDb extends Db {
           "_Z20IncrementNdvFinalizePN10impala_udf15FunctionContextERKNS_9StringValE",
           true, false, true));
 
+      // DataSketches HLL
+      if (DS_HLL_UPDATE_SYMBOL.containsKey(t)) {
+        db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_hll_sketch_and_estimate",
+            Lists.newArrayList(t), Type.BIGINT, Type.STRING,
+            prefix + "9DsHllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+            prefix + DS_HLL_UPDATE_SYMBOL.get(t),
+            prefix + "10DsHllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+            prefix + "14DsHllSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+            prefix + "13DsHllFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+            true, false, true));
+
+        db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_hll_sketch",
+            Lists.newArrayList(t), Type.STRING, Type.STRING,
+            prefix + "9DsHllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+            prefix + DS_HLL_UPDATE_SYMBOL.get(t),
+            prefix + "10DsHllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+            prefix + "14DsHllSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+            prefix + "19DsHllFinalizeSketchEPN10impala_udf15FunctionContextERKNS1_" +
+                "9StringValE", true, false, true));
+      } else {
+        db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db,
+            "ds_hll_sketch_and_estimate", Lists.newArrayList(t), Type.STRING,
+            Type.STRING));
+        db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db, "ds_hll_sketch",
+            Lists.newArrayList(t), Type.STRING, Type.STRING));
+      }
+
       // SAMPLED_NDV.
       // Size needs to be kept in sync with SampledNdvState in the BE.
       int NUM_HLL_BUCKETS = 32;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 2d202df..609743c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -110,6 +110,11 @@ public class Function extends CatalogObjectImpl {
   // native and IR functions, but only Java functions created without a signature.
   private boolean isPersistent_;
 
+  // Functions with specific parameters can be marked as unsupported so that during
+  // analysis the query can be rejected without Impala trying to cast parameters to a
+  // different type that would be supported.
+  protected boolean isUnsupported_;
+
   public Function(FunctionName name, Type[] argTypes,
       Type retType, boolean varArgs) {
     this.name_ = name;
@@ -125,6 +130,7 @@ public class Function extends CatalogObjectImpl {
       this.retType_ = retType;
     }
     this.userVisible_ = true;
+    this.isUnsupported_ = false;
   }
 
   public Function(FunctionName name, List<Type> args,
@@ -163,12 +169,14 @@ public class Function extends CatalogObjectImpl {
     Preconditions.checkState(argTypes_.length > 0);
     return argTypes_[argTypes_.length - 1];
   }
+  public boolean isUnsupported() { return isUnsupported_; }
 
   public void setLocation(HdfsUri loc) { location_ = loc; }
   public void setBinaryType(TFunctionBinaryType type) { binaryType_ = type; }
   public void setHasVarArgs(boolean v) { hasVarArgs_ = v; }
   public void setIsPersistent(boolean v) { isPersistent_ = v; }
   public void setUserVisible(boolean b) { userVisible_ = b; }
+  protected void setUnsupported() { isUnsupported_ = true; }
 
   // Returns a string with the signature in human readable format:
   // FnName(argtype1, argtyp2).  e.g. Add(int, int)
diff --git a/testdata/data/README b/testdata/data/README
index b91bcd5..9469d56 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -499,6 +499,12 @@ corrupt_root_type.orc:
 ORC file for IMPALA-9249, generated by fuzz test. The root type of the schema is not
 struct, which used to hit a DCHECK.
 
+hll_sketches_from_hive.parquet:
+This file contains a table that has some string columns to store serialized Apache
+DataSketches HLL sketches created by Hive. Each column contains a sketch for a
+specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
+STRING, CHAR and VARCHAR. Has an additional column for NULL values.
+
 hudi_parquet:
 IMPALA-8778: Support read Apache Hudi tables
 Hudi parquet is a special format of parquet files managed by Apache Hudi
diff --git a/testdata/data/hll_sketches_from_hive.parquet b/testdata/data/hll_sketches_from_hive.parquet
new file mode 100644
index 0000000..caf3fbf
Binary files /dev/null and b/testdata/data/hll_sketches_from_hive.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-hll.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-hll.test
new file mode 100644
index 0000000..cfaac31
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-hll.test
@@ -0,0 +1,168 @@
+====
+---- QUERY
+# Use a small table for testing Datasketches HLL functions through Impala to make sure
+# that these approximate functions give the correct result. For testing Impala
+# functionality no need to test how Datasketches HLL approximates count distint values
+# so a small table is enough.
+select
+    ds_hll_estimate(ds_hll_sketch(tinyint_col)),
+    ds_hll_estimate(ds_hll_sketch(int_col)),
+    ds_hll_estimate(ds_hll_sketch(bigint_col)),
+    ds_hll_estimate(ds_hll_sketch(float_col)),
+    ds_hll_estimate(ds_hll_sketch(double_col)),
+    ds_hll_estimate(ds_hll_sketch(string_col))
+from functional_parquet.alltypessmall
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+select
+    ds_hll_sketch_and_estimate(tinyint_col),
+    ds_hll_sketch_and_estimate(int_col),
+    ds_hll_sketch_and_estimate(bigint_col),
+    ds_hll_sketch_and_estimate(float_col),
+    ds_hll_sketch_and_estimate(double_col),
+    ds_hll_sketch_and_estimate(string_col)
+from functional_parquet.alltypessmall
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that unsupported types give error with ds_hll_sketch().
+select ds_hll_sketch(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch(BOOLEAN)
+====
+---- QUERY
+select ds_hll_sketch(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch(SMALLINT)
+====
+---- QUERY
+select ds_hll_sketch(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch(DATE)
+====
+---- QUERY
+select ds_hll_sketch(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch(DECIMAL(9,0))
+====
+---- QUERY
+# Check that unsupported types give error with ds_hll_sketch_and_estimate().
+select ds_hll_sketch_and_estimate(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch_and_estimate(BOOLEAN)
+====
+---- QUERY
+select ds_hll_sketch_and_estimate(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch_and_estimate(SMALLINT)
+====
+---- QUERY
+select ds_hll_sketch_and_estimate(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch_and_estimate(DATE)
+====
+---- QUERY
+select ds_hll_sketch_and_estimate(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_hll_sketch_and_estimate(DECIMAL(9,0))
+====
+---- QUERY
+# Check if HLL works with null values.
+select
+    ds_hll_estimate(ds_hll_sketch(null_str)),
+    ds_hll_estimate(ds_hll_sketch(null_int)),
+    ds_hll_estimate(ds_hll_sketch(null_double)),
+    ds_hll_estimate(ds_hll_sketch(some_nulls)),
+    ds_hll_sketch_and_estimate(null_str),
+    ds_hll_sketch_and_estimate(null_int),
+    ds_hll_sketch_and_estimate(null_double),
+    ds_hll_sketch_and_estimate(some_nulls)
+from functional_parquet.nullrows
+---- RESULTS
+NULL,NULL,NULL,6,NULL,NULL,NULL,6
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check if HLL works for empty datasets.
+select
+    ds_hll_estimate(ds_hll_sketch(field)),
+    ds_hll_estimate(ds_hll_sketch(f2)),
+    ds_hll_sketch_and_estimate(field),
+    ds_hll_sketch_and_estimate(f2)
+from functional_parquet.emptytable
+---- RESULTS
+NULL,NULL,NULL,NULL
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Write sketches to a table as string and get an estimate from the written sketch.
+# Note, the plan is to write sketches as binary instead of strings. For this we have to
+# wait for the binary support (IMPALA-9482).
+create table sketch_store
+    (year int, month int, date_sketch string, float_sketch string)
+stored as parquet;
+insert into sketch_store
+    select
+        year,
+        month,
+        ds_hll_sketch(date_string_col),
+        ds_hll_sketch(float_col)
+    from functional_parquet.alltypessmall
+    group by year, month;
+select
+    year,
+    month,
+    ds_hll_estimate(date_sketch),
+    ds_hll_estimate(float_sketch)
+from sketch_store;
+---- RESULTS
+2009,1,3,10
+2009,2,3,10
+2009,3,3,10
+2009,4,3,10
+---- TYPES
+INT,INT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that ds_hll_estimate returns error for strings that are not serialized sketches.
+select ds_hll_estimate(date_string_col) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that ds_hll_estimate returns null for null inputs.
+select ds_hll_estimate(c) from functional_parquet.nulltable;
+---- RESULTS
+NULL
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that sketches made by Hive can be read and used for estimating by Impala.
+select
+    ds_hll_estimate(ti) as ti,
+    ds_hll_estimate(i) as i,
+    ds_hll_estimate(bi) as bi,
+    ds_hll_estimate(f) as f,
+    ds_hll_estimate(d) as d,
+    ds_hll_estimate(s) as s,
+    ds_hll_estimate(c) as c,
+    ds_hll_estimate(v) as v,
+    ds_hll_estimate(nc) as nc
+from hll_sketches_from_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,5,6,4,3,3,NULL
+====
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
new file mode 100644
index 0000000..2a18a7f
--- /dev/null
+++ b/tests/query_test/test_datasketches.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.file_utils import create_table_from_parquet
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_single_exec_option_dimension
+
+
+class TestDatasketches(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestDatasketches, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['parquet'])
+
+  def test_hll(self, vector, unique_database):
+    create_table_from_parquet(self.client, unique_database, 'hll_sketches_from_hive')
+    self.run_test_case('QueryTest/datasketches-hll', vector, unique_database)