You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/02/18 09:10:53 UTC

[impala] 01/03: IMPALA-10463: Implement ds_theta_sketch() and ds_theta_estimate() functions

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 65c6a81ed908084da2175e97664c0f76ded9aed8
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Thu Jan 21 10:12:59 2021 +0800

    IMPALA-10463: Implement ds_theta_sketch() and ds_theta_estimate() functions
    
    These functions can be used to get cardinality estimates of data
    using Theta algorithm from Apache DataSketches. ds_theta_sketch()
    receives a dataset, e.g. a column from a table, and returns a
    serialized Theta sketch in string format. This can be written to a
    table or be fed directly to ds_theta_estimate() that returns the
    cardinality estimate for that sketch.
    
    Similar to the HLL sketch, the primary use-case for the Theta sketch
    is for counting distinct values as a stream, and then merging
    multiple sketches together for a total distinct count.
    
    For more details about Apache DataSketches' Theta see:
    https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html
    
    Testing:
     - Added some tests running estimates for small datasets where the
       amount of data is small enough to get the correct results.
     - Ran manual tests on tpch25_parquet.lineitem to compare perfomance
       with ds_hll_*. ds_theta_* is faster than ds_hll_* on the original
       data, the difference is around 1%-10%. ds_hll_estimate() is faster
       than ds_theta_estimate() on existing sketch. HLL and Theta gives
       closer estimate except for string. see IMPALA-10464.
    
    Change-Id: I14f24c16b815eec75cf90bb92c8b8b0363dcbfbc
    Reviewed-on: http://gerrit.cloudera.org:8080/17008
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             | 166 +++++++++++++++++++
 be/src/exprs/aggregate-functions-test.cc           |  15 ++
 be/src/exprs/aggregate-functions.h                 |   9 ++
 be/src/exprs/datasketches-functions-ir.cc          |  18 +++
 be/src/exprs/datasketches-functions.h              |   6 +
 common/function-registry/impala_functions.py       |   2 +
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  43 +++++
 testdata/data/README                               |   6 +
 testdata/data/theta_sketches_from_hive.parquet     | Bin 0 -> 5220 bytes
 .../queries/QueryTest/datasketches-theta.test      | 178 +++++++++++++++++++++
 tests/query_test/test_datasketches.py              |   4 +
 11 files changed, 447 insertions(+)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index d0afd51..fd47061 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -40,6 +40,8 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/theta_sketch.hpp"
+#include "thirdparty/datasketches/theta_union.hpp"
 #include "thirdparty/datasketches/kll_sketch.hpp"
 #include "util/arithmetic-util.h"
 #include "util/mpfit-util.h"
@@ -1637,6 +1639,27 @@ StringVal SerializeDsHllUnion(FunctionContext* ctx,
   return SerializeCompactDsHllSketch(ctx, sketch);
 }
 
+/// Auxiliary function that receives a theta_sketch and returns the serialized version of
+/// it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
+/// functionality into the header.
+StringVal SerializeDsThetaSketch(
+    FunctionContext* ctx, const datasketches::theta_sketch& sketch) {
+  std::stringstream serialized_input(std::ios::in | std::ios::out | std::ios::binary);
+  sketch.serialize(serialized_input);
+  return StringStreamToStringVal(ctx, serialized_input);
+}
+
+/// Auxiliary function that receives a theta_union, gets the underlying Theta sketch from
+/// the union object and returns the serialized, Theta sketch wrapped into StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
+/// functionality into the header.
+StringVal SerializeDsThetaUnion(
+    FunctionContext* ctx, const datasketches::theta_union& ds_union) {
+  datasketches::compact_theta_sketch sketch = ds_union.get_result();
+  return SerializeDsThetaSketch(ctx, sketch);
+}
+
 /// Auxiliary function that receives a kll_sketch<float> and returns the serialized
 /// version of it wrapped into a StringVal.
 /// Introducing this function in the .cc to avoid including the whole DataSketches HLL
@@ -1812,6 +1835,132 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
   return result;
 }
 
+void AggregateFunctions::DsThetaInit(FunctionContext* ctx, StringVal* dst) {
+  AllocBuffer(ctx, dst, sizeof(datasketches::update_theta_sketch));
+  if (UNLIKELY(dst->is_null)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return;
+  }
+  // Note, that update_theta_sketch will always have the same size regardless of the
+  // amount of data it keeps track. This is because it's a wrapper class that holds all
+  // the inserted data on heap. Here, we put only the wrapper class into a StringVal.
+  datasketches::update_theta_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::update_theta_sketch*>(dst->ptr);
+  datasketches::update_theta_sketch sketch =
+      datasketches::update_theta_sketch::builder().build();
+  std::uninitialized_fill_n(sketch_ptr, 1, sketch);
+}
+
+template <typename T>
+void AggregateFunctions::DsThetaUpdate(
+    FunctionContext* ctx, const T& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::update_theta_sketch));
+  datasketches::update_theta_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::update_theta_sketch*>(dst->ptr);
+  sketch_ptr->update(src.val);
+}
+
+// Specialize for StringVal
+template <>
+void AggregateFunctions::DsThetaUpdate(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  if (src.is_null || src.len == 0) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::update_theta_sketch));
+  datasketches::update_theta_sketch* sketch_ptr =
+      reinterpret_cast<datasketches::update_theta_sketch*>(dst->ptr);
+  sketch_ptr->update(reinterpret_cast<char*>(src.ptr), src.len);
+}
+
+StringVal AggregateFunctions::DsThetaSerialize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK(src.len == sizeof(datasketches::update_theta_sketch)
+      || src.len == sizeof(datasketches::theta_union));
+  StringVal dst;
+  if (src.len == sizeof(datasketches::update_theta_sketch)) {
+    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
+    dst = SerializeDsThetaSketch(ctx, *sketch_ptr);
+  } else {
+    auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
+    dst = SerializeDsThetaUnion(ctx, *union_ptr);
+  }
+  ctx->Free(src.ptr);
+  return dst;
+}
+
+void AggregateFunctions::DsThetaMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  DCHECK(!src.is_null);
+  DCHECK(!dst->is_null);
+  DCHECK(dst->len == sizeof(datasketches::update_theta_sketch)
+      or dst->len == sizeof(datasketches::theta_union));
+
+  // Note, 'src' is a serialized theta_sketch.
+  auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+  if (src_sketch->is_empty()) return;
+
+  if (dst->len == sizeof(datasketches::theta_union)) {
+    auto dst_union_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr);
+    dst_union_ptr->update(*src_sketch);
+  } else if (dst->len == sizeof(datasketches::update_theta_sketch)) {
+    auto dst_sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(dst->ptr);
+
+    datasketches::theta_union union_sketch = datasketches::theta_union::builder().build();
+    union_sketch.update(*src_sketch);
+    union_sketch.update(*dst_sketch_ptr);
+
+    // theta_union.get_result() returns a compact sketch, does not support updating, and
+    // is inconsistent with the initial underlying type of dst. This is different from
+    // the HLL sketch. Here use theta_union as the underlying type of dst.
+    ctx->Free(dst->ptr);
+    AllocBuffer(ctx, dst, sizeof(datasketches::theta_union));
+    if (UNLIKELY(dst->is_null)) {
+      DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+      return;
+    }
+    datasketches::theta_union* union_ptr =
+        reinterpret_cast<datasketches::theta_union*>(dst->ptr);
+    std::uninitialized_fill_n(union_ptr, 1, union_sketch);
+  }
+}
+
+BigIntVal AggregateFunctions::DsThetaFinalize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK(src.len == sizeof(datasketches::update_theta_sketch)
+      or src.len == sizeof(datasketches::theta_union));
+  BigIntVal estimate;
+  if (src.len == sizeof(datasketches::update_theta_sketch)) {
+    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
+    estimate = sketch_ptr->get_estimate();
+  } else {
+    auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
+    estimate = union_ptr->get_result().get_estimate();
+  }
+  ctx->Free(src.ptr);
+  return estimate;
+}
+
+StringVal AggregateFunctions::DsThetaFinalizeSketch(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK(src.len == sizeof(datasketches::update_theta_sketch)
+      or src.len == sizeof(datasketches::theta_union));
+  StringVal result;
+  if (src.len == sizeof(datasketches::update_theta_sketch)) {
+    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
+    result = SerializeDsThetaSketch(ctx, *sketch_ptr);
+  } else {
+    auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
+    result = SerializeDsThetaUnion(ctx, *union_ptr);
+  }
+  ctx->Free(src.ptr);
+  return result;
+}
+
 void AggregateFunctions::DsKllInitHelper(FunctionContext* ctx, StringVal* slot) {
   AllocBuffer(ctx, slot, sizeof(datasketches::kll_sketch<float>));
   if (UNLIKELY(slot->is_null)) {
@@ -2901,6 +3050,23 @@ template void AggregateFunctions::DsHllUpdate(
 template void AggregateFunctions::DsHllUpdate(
     FunctionContext*, const DateVal&, StringVal*);
 
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const BooleanVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const TinyIntVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const SmallIntVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const IntVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const BigIntVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const FloatVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const DoubleVal&, StringVal*);
+template void AggregateFunctions::DsThetaUpdate(
+    FunctionContext*, const DateVal&, StringVal*);
+
 template void AggregateFunctions::SampledNdvUpdate(
     FunctionContext*, const BooleanVal&, const DoubleVal&, StringVal*);
 template void AggregateFunctions::SampledNdvUpdate(
diff --git a/be/src/exprs/aggregate-functions-test.cc b/be/src/exprs/aggregate-functions-test.cc
index 76ffb67..b3f3b86 100644
--- a/be/src/exprs/aggregate-functions-test.cc
+++ b/be/src/exprs/aggregate-functions-test.cc
@@ -162,4 +162,19 @@ TEST(HistogramTest, TestString) {
   EXPECT_TRUE(test.Execute(input, StringVal(&expected[0]))) << test.GetErrorMsg();
 }
 
+TEST(DsThetaSketch, DataToSketch) {
+  UdaTestHarness<BigIntVal, StringVal, IntVal> test(AggregateFunctions::DsThetaInit,
+      AggregateFunctions::DsThetaUpdate<IntVal>, AggregateFunctions::DsThetaMerge,
+      AggregateFunctions::DsThetaSerialize, AggregateFunctions::DsThetaFinalize);
+  std::vector<IntVal> input;
+
+  EXPECT_TRUE(test.Execute(input, BigIntVal(0)))
+      << "DsThetaSketch empty: " << test.GetErrorMsg();
+
+  for (int key = 0; key < 6; key++) input.push_back(key);
+
+  EXPECT_TRUE(test.Execute(input, BigIntVal(6)))
+      << "DsThetaSketch: " << test.GetErrorMsg();
+}
+
 IMPALA_TEST_MAIN();
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index 9c7cdaf..6c41493 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -250,6 +250,15 @@ class AggregateFunctions {
   static void DsHllUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
   static StringVal DsHllUnionFinalize(FunctionContext*, const StringVal& src);
 
+  /// These functions implement Apache DataSketches Theta support for sketching.
+  static void DsThetaInit(FunctionContext*, StringVal* slot);
+  template <typename T>
+  static void DsThetaUpdate(FunctionContext*, const T& src, StringVal* dst);
+  static StringVal DsThetaSerialize(FunctionContext*, const StringVal& src);
+  static void DsThetaMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+  static BigIntVal DsThetaFinalize(FunctionContext*, const StringVal& src);
+  static StringVal DsThetaFinalizeSketch(FunctionContext*, const StringVal& src);
+
   /// These functions implement Apache DataSketches KLL support for sketching.
   static void DsKllInit(FunctionContext*, StringVal* slot);
   static void DsKllUpdate(FunctionContext*, const FloatVal& src, StringVal* dst);
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index e5d02cb..df390bd 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -20,6 +20,7 @@
 #include "exprs/datasketches-common.h"
 #include "gutil/strings/substitute.h"
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/theta_sketch.hpp"
 #include "thirdparty/datasketches/kll_sketch.hpp"
 #include "udf/udf-internal.h"
 
@@ -104,6 +105,23 @@ StringVal DataSketchesFunctions::DsHllStringify(FunctionContext* ctx,
   return dst;
 }
 
+BigIntVal DataSketchesFunctions::DsThetaEstimate(
+    FunctionContext* ctx, const StringVal& serialized_sketch) {
+  if (serialized_sketch.is_null || serialized_sketch.len == 0) return 0;
+  try {
+    // serialized_sketch may be a serialized of update_theta_sketch or
+    // compact_theta_sketch
+    auto sketch = datasketches::theta_sketch::deserialize(
+        (void*)serialized_sketch.ptr, serialized_sketch.len);
+    return sketch->get_estimate();
+  } catch (const std::exception&) {
+    // One reason of throwing from deserialization is that the input string is not a
+    // serialized sketch.
+    LogSketchDeserializationError(ctx);
+    return BigIntVal::null();
+  }
+}
+
 FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
     const StringVal& serialized_sketch, const DoubleVal& rank) {
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null();
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index de453b8..3a8036e 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -64,6 +64,12 @@ public:
   static StringVal DsHllStringify(FunctionContext* ctx,
       const StringVal& serialized_sketch);
 
+  /// 'serialized_sketch' is expected as a serialized Apache DataSketches Theta sketch.
+  /// If it is not, then the query fails. Otherwise, returns the count(distinct) estimate
+  /// from the sketch.
+  static BigIntVal DsThetaEstimate(
+      FunctionContext* ctx, const StringVal& serialized_sketch);
+
   /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
   /// it is not, then the query fails. 'rank' is used to identify which item (estimate)
   /// to return from the sketched dataset. E.g. 0.1 means the item where 10% of the
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index d95424f..ee62062 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -1003,6 +1003,8 @@ visible_functions = [
       '_ZN6impala21DataSketchesFunctions11DsHllUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['ds_hll_stringify'], 'STRING', ['STRING'],
       '_ZN6impala21DataSketchesFunctions14DsHllStringifyEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
+  [['ds_theta_estimate'], 'BIGINT', ['STRING'],
+     '_ZN6impala21DataSketchesFunctions15DsThetaEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
   [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
       '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
   [['ds_kll_n'], 'BIGINT', ['STRING'],
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 9094a31..fd35538 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -393,6 +393,22 @@ public class BuiltinsDb extends Db {
             "11DsHllUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
         .build();
 
+  private static final Map<Type, String> DS_THETA_UPDATE_SYMBOL =
+      ImmutableMap.<Type, String>builder()
+        .put(Type.TINYINT,
+            "13DsThetaUpdateIN10impala_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.INT,
+            "13DsThetaUpdateIN10impala_udf6IntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.BIGINT,
+            "13DsThetaUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.FLOAT,
+            "13DsThetaUpdateIN10impala_udf8FloatValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.DOUBLE,
+            "13DsThetaUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+        .put(Type.STRING,
+            "13DsThetaUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
+        .build();
+
   private static final Map<Type, String> SAMPLED_NDV_UPDATE_SYMBOL =
       ImmutableMap.<Type, String>builder()
         .put(Type.BOOLEAN,
@@ -1078,6 +1094,33 @@ public class BuiltinsDb extends Db {
             Lists.newArrayList(t), Type.STRING, Type.STRING));
       }
 
+      // DataSketches Theta
+      if (DS_THETA_UPDATE_SYMBOL.containsKey(t)) {
+        db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_sketch_and_estimate",
+                Lists.newArrayList(t), Type.BIGINT, Type.STRING,
+                prefix + "11DsThetaInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+                prefix + DS_THETA_UPDATE_SYMBOL.get(t),
+                prefix + "12DsThetaMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+                prefix + "16DsThetaSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+                prefix + "15DsThetaFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+                true, false, true));
+
+        db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_sketch",
+                Lists.newArrayList(t), Type.STRING, Type.STRING,
+                prefix + "11DsThetaInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+                prefix + DS_THETA_UPDATE_SYMBOL.get(t),
+                prefix + "12DsThetaMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+                prefix + "16DsThetaSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+                prefix + "21DsThetaFinalizeSketchEPN10impala_udf15FunctionContextERKNS1_" +
+                        "9StringValE", true, false, true));
+      } else {
+        db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db,
+                "ds_theta_sketch_and_estimate", Lists.newArrayList(t), Type.STRING,
+                Type.STRING));
+        db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db, "ds_theta_sketch",
+                Lists.newArrayList(t), Type.STRING, Type.STRING));
+      }
+
       // SAMPLED_NDV.
       // Size needs to be kept in sync with SampledNdvState in the BE.
       int NUM_HLL_BUCKETS = 32;
diff --git a/testdata/data/README b/testdata/data/README
index 0121cef..c522f75 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -509,6 +509,12 @@ hll_sketches_from_impala.parquet:
 This holds the same sketches as hll_sketches_from_hive.parquet but these sketches were
 created by Impala instead of Hive.
 
+theta_sketches_from_hive.parquet:
+This file contains a table that has some string columns to store serialized Apache
+DataSketches Theta sketches created by Hive. Each column contains a sketch for a
+specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
+STRING, CHAR and VARCHAR. Has an additional column for NULL values.
+
 kll_sketches_from_hive.parquet:
 This file contains a table that has some string columns to store serialized Apache
 DataSketches KLL sketches created by Hive. Each column is for a different purpose:
diff --git a/testdata/data/theta_sketches_from_hive.parquet b/testdata/data/theta_sketches_from_hive.parquet
new file mode 100644
index 0000000..70fb5e2
Binary files /dev/null and b/testdata/data/theta_sketches_from_hive.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
new file mode 100644
index 0000000..2056e8c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
@@ -0,0 +1,178 @@
+====
+---- QUERY
+# Use a small table for testing Datasketches Theta functions through Impala to make sure
+# that these approximate functions give the correct result. For testing Impala
+# functionality no need to test how Datasketches Theta approximates count distint values
+# so a small table is enough.
+select
+    ds_theta_estimate(ds_theta_sketch(tinyint_col)),
+    ds_theta_estimate(ds_theta_sketch(int_col)),
+    ds_theta_estimate(ds_theta_sketch(bigint_col)),
+    ds_theta_estimate(ds_theta_sketch(float_col)),
+    ds_theta_estimate(ds_theta_sketch(double_col)),
+    ds_theta_estimate(ds_theta_sketch(string_col))
+from functional_parquet.alltypessmall;
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+select
+    ds_theta_sketch_and_estimate(tinyint_col),
+    ds_theta_sketch_and_estimate(int_col),
+    ds_theta_sketch_and_estimate(bigint_col),
+    ds_theta_sketch_and_estimate(float_col),
+    ds_theta_sketch_and_estimate(double_col),
+    ds_theta_sketch_and_estimate(string_col)
+from functional_parquet.alltypessmall;
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that unsupported types give error with ds_theta_sketch().
+select ds_theta_sketch(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch(BOOLEAN)
+====
+---- QUERY
+select ds_theta_sketch(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch(SMALLINT)
+====
+---- QUERY
+select ds_theta_sketch(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch(DATE)
+====
+---- QUERY
+select ds_theta_sketch(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch(DECIMAL(9,0))
+====
+---- QUERY
+# Check that unsupported types give error with ds_theta_sketch_and_estimate().
+select ds_theta_sketch_and_estimate(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch_and_estimate(BOOLEAN)
+====
+---- QUERY
+select ds_theta_sketch_and_estimate(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch_and_estimate(SMALLINT)
+====
+---- QUERY
+select ds_theta_sketch_and_estimate(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch_and_estimate(DATE)
+====
+---- QUERY
+select ds_theta_sketch_and_estimate(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_theta_sketch_and_estimate(DECIMAL(9,0))
+====
+---- QUERY
+# Check if Theta works with null values.
+select
+    ds_theta_estimate(ds_theta_sketch(null_str)),
+    ds_theta_estimate(ds_theta_sketch(null_int)),
+    ds_theta_estimate(ds_theta_sketch(null_double)),
+    ds_theta_estimate(ds_theta_sketch(some_nulls)),
+    ds_theta_sketch_and_estimate(null_str),
+    ds_theta_sketch_and_estimate(null_int),
+    ds_theta_sketch_and_estimate(null_double),
+    ds_theta_sketch_and_estimate(some_nulls)
+from functional_parquet.nullrows;
+---- RESULTS
+0,0,0,6,0,0,0,6
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check if Theta works for empty datasets.
+select
+    ds_theta_estimate(ds_theta_sketch(field)),
+    ds_theta_estimate(ds_theta_sketch(f2)),
+    ds_theta_sketch_and_estimate(field),
+    ds_theta_sketch_and_estimate(f2)
+from functional_parquet.emptytable;
+---- RESULTS
+0,0,0,0
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Write sketches to a table as string and get an estimate from the written sketch.
+# Note, the plan is to write sketches as binary instead of strings. For this we have to
+# wait for the binary support (IMPALA-9482).
+create table sketch_store
+    (year int, month int, date_sketch string, float_sketch string)
+stored as parquet;
+insert into sketch_store
+    select
+        year,
+        month,
+        ds_theta_sketch(date_string_col),
+        ds_theta_sketch(float_col)
+    from functional_parquet.alltypessmall
+    group by year, month;
+select
+    year,
+    month,
+    ds_theta_estimate(date_sketch),
+    ds_theta_estimate(float_sketch)
+from sketch_store order by month;
+---- RESULTS
+2009,1,3,10
+2009,2,3,10
+2009,3,3,10
+2009,4,3,10
+---- TYPES
+INT,INT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that ds_theta_estimate returns error for strings that are not serialized sketches.
+select ds_theta_estimate(date_string_col) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that ds_theta_estimate returns error for HLL serialized sketches .
+create table hll_sketch_store (date_sketch string) stored as parquet;
+insert into hll_sketch_store
+    select ds_hll_sketch(date_string_col)
+    from functional_parquet.alltypessmall;
+select ds_theta_estimate(date_sketch) from hll_sketch_store;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that ds_theta_estimate returns null for null and empty string inputs.
+select ds_theta_estimate(b), ds_theta_estimate(c) from functional_parquet.nulltable;
+---- RESULTS
+0,0
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+# Check that sketches made by Hive can be read and used for estimating by Impala.
+select
+    ds_theta_estimate(ti) as ti,
+    ds_theta_estimate(i) as i,
+    ds_theta_estimate(bi) as bi,
+    ds_theta_estimate(f) as f,
+    ds_theta_estimate(d) as d,
+    ds_theta_estimate(s) as s,
+    ds_theta_estimate(c) as c,
+    ds_theta_estimate(v) as v,
+    ds_theta_estimate(nc) as nc
+from theta_sketches_from_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index a0f655e..bc99cc5 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -37,6 +37,10 @@ class TestDatasketches(ImpalaTestSuite):
     create_table_from_parquet(self.client, unique_database, 'hll_sketches_from_impala')
     self.run_test_case('QueryTest/datasketches-hll', vector, unique_database)
 
+  def test_theta(self, vector, unique_database):
+    create_table_from_parquet(self.client, unique_database, 'theta_sketches_from_hive')
+    self.run_test_case('QueryTest/datasketches-theta', vector, unique_database)
+
   def test_kll(self, vector, unique_database):
     create_table_from_parquet(self.client, unique_database, 'kll_sketches_from_hive')
     create_table_from_parquet(self.client, unique_database, 'kll_sketches_from_impala')