You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/07/31 17:24:01 UTC

[impala] 01/02: IMPALA-9959: Implement ds_kll_sketch() and ds_kll_quantile() functions

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 033a4607e2c9cd5a107a3af01f3fb3490bc5bc6e
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Fri Jul 17 09:06:35 2020 +0200

    IMPALA-9959: Implement ds_kll_sketch() and ds_kll_quantile() functions
    
    ds_kll_sketch() is an aggregate function that receives a float
    parameter (e.g. a float column of a table) and returns a serialized
    Apache DataSketches KLL sketch of the input data set wrapped into
    STRING type. This sketch can be saved into a table or view and later
    used for quantile approximations. ds_kll_quantile() receives two
    parameters: a STRING parameter that contains a serialized KLL sketch
    and a DOUBLE that represents the rank of the quantile in the range of
    [0,1]. E.g. rank=0.1 means the approximate value in the sketch where
    10% of the sketched items are less than or equals to this value.
    
    Testing:
      - Added automated tests on small data sets to check the basic
        functionality of sketching and getting a quantile approximate.
      - Tested on TPCH25_parquet.lineitem to check that sketching and
        approximating works on bigger scale as well where serialize/merge
        phases are also required. On this scale the error range of the
        quantile approximation is within 1-1.5%
    
    Change-Id: I11de5fe10bb5d0dd42fb4ee45c4f21cb31963e52
    Reviewed-on: http://gerrit.cloudera.org:8080/16235
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             |  95 +++++++++++++-
 be/src/exprs/aggregate-functions.h                 |   7 +
 be/src/exprs/datasketches-common.cc                |  18 ++-
 be/src/exprs/datasketches-common.h                 |  16 +--
 be/src/exprs/datasketches-functions-ir.cc          |  28 +++-
 be/src/exprs/datasketches-functions.h              |  16 ++-
 common/function-registry/impala_functions.py       |   5 +-
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  10 ++
 testdata/data/README                               |  10 ++
 testdata/data/kll_sketches_from_hive.parquet       | Bin 0 -> 2501 bytes
 .../queries/QueryTest/datasketches-kll.test        | 146 +++++++++++++++++++++
 tests/query_test/test_datasketches.py              |   4 +
 12 files changed, 333 insertions(+), 22 deletions(-)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index e3db0cc..1762860 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -40,6 +40,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/kll_sketch.hpp"
 #include "util/arithmetic-util.h"
 #include "util/mpfit-util.h"
 #include "util/pretty-printer.h"
@@ -54,6 +55,8 @@ using std::min_element;
 using std::nth_element;
 using std::pop_heap;
 using std::push_heap;
+using std::string;
+using std::stringstream;
 
 namespace {
 // Threshold for each precision where it's better to use linear counting instead
@@ -1612,6 +1615,14 @@ BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal&
   return estimate;
 }
 
+StringVal StringStreamToStringVal(FunctionContext* ctx,
+    const stringstream& str_stream) {
+  string str = str_stream.str();
+  StringVal dst(ctx, str.size());
+  memcpy(dst.ptr, str.c_str(), str.size());
+  return dst;
+}
+
 /// Auxiliary function that receives a hll_sketch and returns the serialized version of
 /// it wrapped into a StringVal.
 /// Introducing this function in the .cc to avoid including the whole DataSketches HLL
@@ -1620,10 +1631,7 @@ StringVal SerializeCompactDsHllSketch(FunctionContext* ctx,
     const datasketches::hll_sketch& sketch) {
   std::stringstream serialized_input;
   sketch.serialize_compact(serialized_input);
-  std::string serialized_input_str = serialized_input.str();
-  StringVal dst(ctx, serialized_input_str.size());
-  memcpy(dst.ptr, serialized_input_str.c_str(), serialized_input_str.size());
-  return dst;
+  return StringStreamToStringVal(ctx, serialized_input);
 }
 
 /// Auxiliary function that receives a hll_union, gets the underlying HLL sketch from the
@@ -1637,6 +1645,17 @@ StringVal SerializeDsHllUnion(FunctionContext* ctx,
   return SerializeCompactDsHllSketch(ctx, sketch);
 }
 
+/// Auxiliary function that receives a kll_sketch<float> and returns the serialized
+/// version of it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
+/// functionality into the header
+StringVal SerializeDsKllSketch(FunctionContext* ctx,
+    const datasketches::kll_sketch<float>& sketch) {
+  std::stringstream serialized_sketch;
+  sketch.serialize(serialized_sketch);
+  return StringStreamToStringVal(ctx, serialized_sketch);
+}
+
 void AggregateFunctions::DsHllInit(FunctionContext* ctx, StringVal* dst) {
   AllocBuffer(ctx, dst, sizeof(datasketches::hll_sketch));
   if (UNLIKELY(dst->is_null)) {
@@ -1743,10 +1762,10 @@ void AggregateFunctions::DsHllUnionUpdate(FunctionContext* ctx, const StringVal&
   if (src.is_null) return;
   DCHECK(!dst->is_null);
   DCHECK_EQ(dst->len, sizeof(datasketches::hll_union));
-  // These parameters might be overwritten by DeserializeHllSketch() to use the settings
+  // These parameters might be overwritten by DeserializeDsSketch() to use the settings
   // from the deserialized sketch from 'src'.
   datasketches::hll_sketch src_sketch(DS_SKETCH_CONFIG, DS_HLL_TYPE);
-  if (!DeserializeHllSketch(src, &src_sketch)) {
+  if (!DeserializeDsSketch(src, &src_sketch)) {
     LogSketchDeserializationError(ctx);
     return;
   }
@@ -1798,6 +1817,70 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
   return result;
 }
 
+void AggregateFunctions::DsKllInit(FunctionContext* ctx, StringVal* dst) {
+  AllocBuffer(ctx, dst, sizeof(datasketches::kll_sketch<float>));
+  if (UNLIKELY(dst->is_null)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return;
+  }
+  // Note, that kll_sketch will always have the same size regardless of the amount of
+  // data it keeps track of. This is because it's a wrapper class that holds all the
+  // inserted data on heap. Here, we put only the wrapper class into a StringVal.
+  datasketches::kll_sketch<float>* sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+  *sketch_ptr = datasketches::kll_sketch<float>();
+}
+
+void AggregateFunctions::DsKllUpdate(FunctionContext* ctx, const FloatVal& src,
+    StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float>* sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+  sketch_ptr->update(src.val);
+}
+
+StringVal AggregateFunctions::DsKllSerialize(FunctionContext* ctx,
+    const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float>* sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(src.ptr);
+  StringVal dst = SerializeDsKllSketch(ctx, *sketch_ptr);
+  ctx->Free(src.ptr);
+  return dst;
+}
+
+void AggregateFunctions::DsKllMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  DCHECK(!src.is_null);
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float> src_sketch =
+      datasketches::kll_sketch<float>::deserialize((void*)src.ptr, src.len);
+
+  datasketches::kll_sketch<float>* dst_sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+
+  dst_sketch_ptr->merge(src_sketch);
+}
+
+StringVal AggregateFunctions::DsKllFinalizeSketch(FunctionContext* ctx,
+    const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float>* sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(src.ptr);
+  if (sketch_ptr->get_n() == 0) {
+    ctx->Free(src.ptr);
+    return StringVal::null();
+  }
+  StringVal dst = SerializeDsKllSketch(ctx, *sketch_ptr);
+  ctx->Free(src.ptr);
+  return dst;
+}
+
 /// Intermediate aggregation state for the SampledNdv() function.
 /// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>.
 /// The 'row_count' keeps track of how many input rows were aggregated into that
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index d7fb986..487c451 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -250,6 +250,13 @@ class AggregateFunctions {
   static void DsHllUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
   static StringVal DsHllUnionFinalize(FunctionContext*, const StringVal& src);
 
+  /// These functions implement Apache DataSketches KLL support for sketching.
+  static void DsKllInit(FunctionContext*, StringVal* slot);
+  static void DsKllUpdate(FunctionContext*, const FloatVal& src, StringVal* dst);
+  static StringVal DsKllSerialize(FunctionContext*, const StringVal& src);
+  static void DsKllMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsKllFinalizeSketch(FunctionContext*, const StringVal& src);
+
   /// Estimates the number of distinct values (NDV) based on a sample of data and the
   /// corresponding sampling rate. The main idea of this function is to collect several
   /// (x,y) data points where x is the number of rows and y is the corresponding NDV
diff --git a/be/src/exprs/datasketches-common.cc b/be/src/exprs/datasketches-common.cc
index 0fe278c..c9bdcaf 100644
--- a/be/src/exprs/datasketches-common.cc
+++ b/be/src/exprs/datasketches-common.cc
@@ -19,28 +19,36 @@
 
 #include "common/logging.h"
 #include "udf/udf-internal.h"
+#include "thirdparty/datasketches/kll_sketch.hpp"
 
 namespace impala {
 
 using datasketches::hll_sketch;
+using datasketches::kll_sketch;
 using impala_udf::StringVal;
 
 void LogSketchDeserializationError(FunctionContext* ctx) {
   ctx->SetError("Unable to deserialize sketch.");
 }
 
-bool DeserializeHllSketch(const StringVal& serialized_sketch, hll_sketch* sketch) {
+template<class T>
+bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch) {
   DCHECK(sketch != nullptr);
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return false;
   try {
-    *sketch = hll_sketch::deserialize((void*)serialized_sketch.ptr,
-        serialized_sketch.len);
+    *sketch = T::deserialize((void*)serialized_sketch.ptr, serialized_sketch.len);
     return true;
-  } catch (const std::invalid_argument&) {
-    // Deserialization throws if the input string is not a serialized sketch.
+  } catch (const std::exception&) {
+    // One reason of throwing from deserialization is that the input string is not a
+    // serialized sketch.
     return false;
   }
 }
 
+template bool DeserializeDsSketch(const StringVal& serialized_sketch,
+    hll_sketch* sketch);
+template bool DeserializeDsSketch(const StringVal& serialized_sketch,
+    kll_sketch<float>* sketch);
+
 }
 
diff --git a/be/src/exprs/datasketches-common.h b/be/src/exprs/datasketches-common.h
index 3d4f43c..7560692 100644
--- a/be/src/exprs/datasketches-common.h
+++ b/be/src/exprs/datasketches-common.h
@@ -37,13 +37,13 @@ const int DS_SKETCH_CONFIG = 12;
 /// Logs a common error message saying that sketch deserialization failed.
 void LogSketchDeserializationError(FunctionContext* ctx);
 
-/// Receives a serialized DataSketches HLL sketch in 'serialized_sketch', deserializes it
-/// and puts the deserialized sketch into 'sketch'. The outgoing 'sketch' will hold the
-/// same configs as 'serialized_sketch' regardless of what was provided when it was
-/// constructed before this function call. Returns false if the deserialization
-/// fails, true otherwise.
-bool DeserializeHllSketch(const StringVal& serialized_sketch,
-    datasketches::hll_sketch* sketch) WARN_UNUSED_RESULT;
-
+/// Receives a serialized DataSketches sketch  (either Hll or KLL) in
+/// 'serialized_sketch', deserializes it and puts the deserialized sketch into 'sketch'.
+/// The outgoing 'sketch' will hold the same configs as 'serialized_sketch' regardless of
+/// what was provided when it was constructed before this function call. Returns false if
+/// the deserialization fails, true otherwise.
+template<class T>
+bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch)
+    WARN_UNUSED_RESULT;
 }
 
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index bba537d..d2898bc 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -18,20 +18,46 @@
 #include "exprs/datasketches-functions.h"
 
 #include "exprs/datasketches-common.h"
+#include "gutil/strings/substitute.h"
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/kll_sketch.hpp"
+#include "udf/udf-internal.h"
 
 namespace impala {
 
+using strings::Substitute;
+
 BigIntVal DataSketchesFunctions::DsHllEstimate(FunctionContext* ctx,
     const StringVal& serialized_sketch) {
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return BigIntVal::null();
   datasketches::hll_sketch sketch(DS_SKETCH_CONFIG, DS_HLL_TYPE);
-  if (!DeserializeHllSketch(serialized_sketch, &sketch)) {
+  if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
     LogSketchDeserializationError(ctx);
     return BigIntVal::null();
   }
   return sketch.get_estimate();
 }
 
+FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
+    const StringVal& serialized_sketch, const DoubleVal& rank) {
+  if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null();
+  if (rank.val < 0.0 || rank.val > 1.0) {
+    ctx->SetError("Rank parameter should be in the range of [0,1]");
+    return FloatVal::null();
+  }
+  datasketches::kll_sketch<float> sketch;
+  if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
+    LogSketchDeserializationError(ctx);
+    return FloatVal::null();
+  }
+  try {
+    return sketch.get_quantile(rank.val);
+  } catch (const std::exception& e) {
+    ctx->SetError(Substitute("Error while getting quantile from DataSketches KLL. "
+        "Message: $0", e.what()).c_str());
+    return FloatVal::null();
+  }
+}
+
 }
 
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index bcbec89..143fd69 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -22,12 +22,26 @@
 namespace impala {
 
 using impala_udf::BigIntVal;
+using impala_udf::DoubleVal;
+using impala_udf::FloatVal;
 using impala_udf::FunctionContext;
 using impala_udf::StringVal;
 
 class DataSketchesFunctions {
 public:
-  static BigIntVal DsHllEstimate(FunctionContext*, const StringVal&);
+  /// 'serialized_sketch' is expected as a serialized Apache DataSketches HLL sketch. If
+  /// it is not then the query fails. Otherwise, returns the count(distinct) estimate
+  /// from the sketch.
+  static BigIntVal DsHllEstimate(FunctionContext* ctx,
+      const StringVal& serialized_sketch);
+
+  /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
+  /// it is not then the query fails. 'rank' is used to identify which item (estimate) to
+  /// return from the sketched dataset. E.g. 0.1 means the item where 10% of the sketched
+  /// dataset is lower or equals to this particular item. 'rank' should be in the range
+  /// of [0,1]. Otherwise this function returns error.
+  static FloatVal DsKllQuantile(FunctionContext* ctx, const StringVal& serialized_sketch,
+      const DoubleVal& rank);
 };
 
 }
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index c366552..8398785 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -931,7 +931,10 @@ visible_functions = [
   [['mask_hash'], 'DATE', ['DATE'], 'impala::MaskFunctions::MaskHash'],
 
   # Functions to use Apache DataSketches functionality
-  [['ds_hll_estimate'], 'BIGINT', ['STRING'], '_ZN6impala21DataSketchesFunctions13DsHllEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
+  [['ds_hll_estimate'], 'BIGINT', ['STRING'],
+      '_ZN6impala21DataSketchesFunctions13DsHllEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
+  [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
+      '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
 ]
 
 invisible_functions = [
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 514e49d..5969549 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1323,6 +1323,16 @@ public class BuiltinsDb extends Db {
         prefix + "10CountMergeEPN10impala_udf15FunctionContextERKNS1_9BigIntValEPS4_",
         null, null));
 
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_kll_sketch",
+        Lists.<Type>newArrayList(Type.FLOAT), Type.STRING, Type.STRING,
+        prefix + "9DsKllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix + "11DsKllUpdateEPN10impala_udf15FunctionContextERKNS1_8FloatValEPNS1_" +
+            "9StringValE",
+        prefix + "10DsKllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix + "14DsKllSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix + "19DsKllFinalizeSketchEPN10impala_udf15FunctionContextERKNS1_" +
+            "9StringValE", true, false, true));
+
     // The following 3 functions are never directly executed because they get rewritten
     db.addBuiltin(AggregateFunction.createAnalyticBuiltin(
         db, "percent_rank", Lists.<Type>newArrayList(), Type.DOUBLE, Type.STRING));
diff --git a/testdata/data/README b/testdata/data/README
index 63c2d7d..41ddeac 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -509,6 +509,16 @@ hll_sketches_from_impala.parquet:
 This holds the same sketches as hll_sketches_from_hive.parquet but these sketches were
 created by Impala instead of Hive.
 
+kll_sketches_from_hive.parquet:
+This file contains a table that has some string columns to store serialized Apache
+DataSketches KLL sketches created by Hive. Each column is for a different purpose:
+  - 'f': Float with distinct values.
+  - 'repetitions': Float with some repetition in the values.
+  - 'some_nulls': Float values and some NULLs.
+  - 'all_nulls': All values are NULLs.
+  - 'some_nans': Floats with some NaN values.
+  - 'all_nans': All values are NaNs.
+
 hudi_parquet:
 IMPALA-8778: Support read Apache Hudi tables
 Hudi parquet is a special format of parquet files managed by Apache Hudi
diff --git a/testdata/data/kll_sketches_from_hive.parquet b/testdata/data/kll_sketches_from_hive.parquet
new file mode 100644
index 0000000..8842981
Binary files /dev/null and b/testdata/data/kll_sketches_from_hive.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
new file mode 100644
index 0000000..b7b734b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
@@ -0,0 +1,146 @@
+====
+---- QUERY
+# Check that ds_kll_quantile returns error for strings that are not serialized sketches.
+select ds_kll_quantile(date_string_col, 0.5) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch
+====
+---- QUERY
+select ds_kll_quantile(ds_kll_sketch(float_col), -0.1) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Rank parameter should be in the range of [0,1]
+====
+---- QUERY
+select ds_kll_quantile(ds_kll_sketch(float_col), 1.1) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Rank parameter should be in the range of [0,1]
+====
+---- QUERY
+select
+    ds_kll_quantile(ds_kll_sketch(id), 0),
+    ds_kll_quantile(ds_kll_sketch(tinyint_col), 0),
+    ds_kll_quantile(ds_kll_sketch(smallint_col), 0),
+    ds_kll_quantile(ds_kll_sketch(int_col), 0),
+    ds_kll_quantile(ds_kll_sketch(bigint_col), 0),
+    ds_kll_quantile(ds_kll_sketch(float_col), 0)
+from functional_parquet.alltypestiny;
+---- RESULTS
+0,0,0,0,0,0
+---- TYPES
+FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
+====
+---- QUERY
+select
+    ds_kll_quantile(ds_kll_sketch(id), 0.5),
+    ds_kll_quantile(ds_kll_sketch(tinyint_col), 0.5),
+    ds_kll_quantile(ds_kll_sketch(smallint_col), 0.5),
+    ds_kll_quantile(ds_kll_sketch(int_col), 0.5),
+    ds_kll_quantile(ds_kll_sketch(bigint_col), 0.5),
+    ds_kll_quantile(ds_kll_sketch(float_col), 0.5)
+from functional_parquet.alltypestiny;
+---- RESULTS
+4,1,1,1,10,1.100000023841858
+---- TYPES
+FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
+====
+---- QUERY
+select
+    ds_kll_quantile(ds_kll_sketch(id), 1),
+    ds_kll_quantile(ds_kll_sketch(tinyint_col), 1),
+    ds_kll_quantile(ds_kll_sketch(smallint_col), 1),
+    ds_kll_quantile(ds_kll_sketch(int_col), 1),
+    ds_kll_quantile(ds_kll_sketch(bigint_col), 1),
+    ds_kll_quantile(ds_kll_sketch(float_col), 1)
+from functional_parquet.alltypestiny;
+---- RESULTS
+7,1,1,1,10,1.100000023841858
+---- TYPES
+FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
+====
+---- QUERY
+select ds_kll_sketch(double_col) from functional_parquet.alltypestiny;
+---- CATCH
+AnalysisException: No matching function with signature: ds_kll_sketch(DOUBLE)
+====
+---- QUERY
+select ds_kll_sketch(string_col) from functional_parquet.alltypestiny;
+---- CATCH
+AnalysisException: No matching function with signature: ds_kll_sketch(STRING)
+====
+---- QUERY
+select ds_kll_sketch(timestamp_col) from functional_parquet.alltypestiny;
+---- CATCH
+AnalysisException: No matching function with signature: ds_kll_sketch(TIMESTAMP)
+====
+---- QUERY
+select ds_kll_sketch(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypestiny;
+---- CATCH
+AnalysisException: No matching function with signature: ds_kll_sketch(DATE)
+====
+---- QUERY
+# Check that ds_kll_quantile() returns null for null inputs.
+select ds_kll_quantile(c, 0.5) from functional_parquet.nulltable;
+---- RESULTS
+NULL
+---- TYPES
+FLOAT
+====
+---- QUERY
+# Check that ds_kll_sketch() returns null for null inputs.
+select ds_kll_sketch(d) from functional_parquet.nulltable;
+---- RESULTS
+'NULL'
+---- TYPES
+STRING
+====
+---- QUERY
+# Check that ds_kll_sketch() returns null for empty input.
+select ds_kll_sketch(f2) from functional_parquet.emptytable;
+---- RESULTS
+'NULL'
+---- TYPES
+STRING
+====
+---- QUERY
+# Write sketches to a table as string and get an estimate from the written sketch.
+# Note, the plan is to write sketches as binary instead of strings. For this we have to
+# wait for the binary support (IMPALA-9482).
+create table sketch_store
+    (year int, month int, float_sketch string)
+stored as parquet;
+insert into sketch_store
+    select
+        year,
+        month,
+        ds_kll_sketch(float_col)
+    from functional_parquet.alltypessmall
+    group by year, month;
+select
+    year,
+    month,
+    ds_kll_quantile(float_sketch, 0.5)
+from sketch_store;
+---- RESULTS
+2009,1,4.400000095367432
+2009,2,4.400000095367432
+2009,3,4.400000095367432
+2009,4,4.400000095367432
+---- TYPES
+INT,INT,FLOAT
+====
+---- QUERY
+# Check that sketches made by Hive can be read and used for estimating by Impala.
+select
+    ds_kll_quantile(f, 0.5) as f,
+    ds_kll_quantile(repetitions, 0.5) as r,
+    ds_kll_quantile(some_nulls, 0.5) as sn,
+    ds_kll_quantile(all_nulls, 0.5) as an,
+    ds_kll_quantile(some_nans, 0.5) as snan,
+    ds_kll_quantile(all_nans, 0.5) as anan
+from kll_sketches_from_hive;
+---- TYPES
+FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
+---- RESULTS
+100.1999969482422,25000.099609375,50.90000152587891,NULL,50.5,NULL
+====
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index 53d051c..1634387 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -36,3 +36,7 @@ class TestDatasketches(ImpalaTestSuite):
     create_table_from_parquet(self.client, unique_database, 'hll_sketches_from_hive')
     create_table_from_parquet(self.client, unique_database, 'hll_sketches_from_impala')
     self.run_test_case('QueryTest/datasketches-hll', vector, unique_database)
+
+  def test_kll(self, vector, unique_database):
+    create_table_from_parquet(self.client, unique_database, 'kll_sketches_from_hive')
+    self.run_test_case('QueryTest/datasketches-kll', vector, unique_database)