You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/11 04:06:42 UTC

[impala] branch master updated (a005778 -> f95f794)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from a005778  IMPALA-9478: Profiles should indicate if custom UDFs are being used
     new 9944355  IMPALA-10039: Fixed Expr-test crash caused by thread unsafe function
     new f95f794  IMPALA-10017: Implement ds_kll_union() function

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/aggregate-functions-ir.cc             | 120 +++++++++++++++++----
 be/src/exprs/aggregate-functions.h                 |  15 +++
 be/src/runtime/exec-env.cc                         |   2 +-
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  15 +++
 testdata/data/README                               |   4 +
 testdata/data/kll_sketches_from_impala.parquet     | Bin 0 -> 2853 bytes
 .../queries/QueryTest/datasketches-kll.test        |  68 ++++++++++--
 tests/query_test/test_datasketches.py              |   1 +
 8 files changed, 195 insertions(+), 30 deletions(-)
 create mode 100644 testdata/data/kll_sketches_from_impala.parquet


[impala] 02/02: IMPALA-10017: Implement ds_kll_union() function

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f95f7940e4a290d75ee85fd78e85bc26795f0f9f
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Thu Jul 30 17:35:12 2020 +0200

    IMPALA-10017: Implement ds_kll_union() function
    
    This function receives a set of serialized Apache DataSketches KLL
    sketches produced by ds_kll_sketch() and merges them into a single
    sketch.
    
    An example usage is to create a sketch for each partition of a table,
    write these sketches to a separate table and based on which partition
    the user is interested of the relevant sketches can be union-ed
    together to get an estimate. E.g.:
      SELECT
          ds_kll_quantile(ds_kll_union(sketch_col), 0.5)
      FROM sketch_tbl
      WHERE partition_col=1 OR partition_col=5;
    
    Testing:
      - Apart from the automated tests I added to this patch I also
        tested ds_kll_union() on a bigger dataset to check that
        serialization, deserialization and merging steps work well. I
        took TPCH25.linelitem, created a number of sketches with grouping
        by l_shipdate and called ds_kll_union() on those sketches.
    
    Change-Id: I020aea28d36f9b6ef9fb57c08411f2170f5c24bf
    Reviewed-on: http://gerrit.cloudera.org:8080/16267
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             | 120 +++++++++++++++++----
 be/src/exprs/aggregate-functions.h                 |  15 +++
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  15 +++
 testdata/data/README                               |   4 +
 testdata/data/kll_sketches_from_impala.parquet     | Bin 0 -> 2853 bytes
 .../queries/QueryTest/datasketches-kll.test        |  68 ++++++++++--
 tests/query_test/test_datasketches.py              |   1 +
 7 files changed, 194 insertions(+), 29 deletions(-)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 1762860..48b2191 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1792,8 +1792,11 @@ void AggregateFunctions::DsHllUnionMerge(
   DCHECK_EQ(dst->len, sizeof(datasketches::hll_union));
 
   // Note, 'src' is a serialized hll_sketch and not a serialized hll_union.
-  datasketches::hll_sketch src_sketch =
-      datasketches::hll_sketch::deserialize(reinterpret_cast<char*>(src.ptr), src.len);
+  datasketches::hll_sketch src_sketch(DS_SKETCH_CONFIG, DS_HLL_TYPE);
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
 
   datasketches::hll_union* dst_union_ptr =
       reinterpret_cast<datasketches::hll_union*>(dst->ptr);
@@ -1817,9 +1820,9 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
   return result;
 }
 
-void AggregateFunctions::DsKllInit(FunctionContext* ctx, StringVal* dst) {
-  AllocBuffer(ctx, dst, sizeof(datasketches::kll_sketch<float>));
-  if (UNLIKELY(dst->is_null)) {
+void AggregateFunctions::DsKllInitHelper(FunctionContext* ctx, StringVal* slot) {
+  AllocBuffer(ctx, slot, sizeof(datasketches::kll_sketch<float>));
+  if (UNLIKELY(slot->is_null)) {
     DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
     return;
   }
@@ -1827,21 +1830,11 @@ void AggregateFunctions::DsKllInit(FunctionContext* ctx, StringVal* dst) {
   // data it keeps track of. This is because it's a wrapper class that holds all the
   // inserted data on heap. Here, we put only the wrapper class into a StringVal.
   datasketches::kll_sketch<float>* sketch_ptr =
-      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+      reinterpret_cast<datasketches::kll_sketch<float>*>(slot->ptr);
   *sketch_ptr = datasketches::kll_sketch<float>();
 }
 
-void AggregateFunctions::DsKllUpdate(FunctionContext* ctx, const FloatVal& src,
-    StringVal* dst) {
-  if (src.is_null) return;
-  DCHECK(!dst->is_null);
-  DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
-  datasketches::kll_sketch<float>* sketch_ptr =
-      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
-  sketch_ptr->update(src.val);
-}
-
-StringVal AggregateFunctions::DsKllSerialize(FunctionContext* ctx,
+StringVal AggregateFunctions::DsKllSerializeHelper(FunctionContext* ctx,
     const StringVal& src) {
   DCHECK(!src.is_null);
   DCHECK_EQ(src.len, sizeof(datasketches::kll_sketch<float>));
@@ -1852,21 +1845,30 @@ StringVal AggregateFunctions::DsKllSerialize(FunctionContext* ctx,
   return dst;
 }
 
-void AggregateFunctions::DsKllMerge(
-    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+void AggregateFunctions::DsKllMergeHelper(FunctionContext* ctx, const StringVal& src,
+      StringVal* dst) {
   DCHECK(!src.is_null);
   DCHECK(!dst->is_null);
   DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
-  datasketches::kll_sketch<float> src_sketch =
-      datasketches::kll_sketch<float>::deserialize((void*)src.ptr, src.len);
+  datasketches::kll_sketch<float> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
 
   datasketches::kll_sketch<float>* dst_sketch_ptr =
       reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
 
-  dst_sketch_ptr->merge(src_sketch);
+  try {
+    dst_sketch_ptr->merge(src_sketch);
+  } catch (const std::exception& e) {
+    ctx->SetError(Substitute("Error while merging DataSketches KLL sketches. "
+        "Message: $0", e.what()).c_str());
+    return;
+  }
 }
 
-StringVal AggregateFunctions::DsKllFinalizeSketch(FunctionContext* ctx,
+StringVal AggregateFunctions::DsKllFinalizeHelper(FunctionContext* ctx,
     const StringVal& src) {
   DCHECK(!src.is_null);
   DCHECK_EQ(src.len, sizeof(datasketches::kll_sketch<float>));
@@ -1881,6 +1883,78 @@ StringVal AggregateFunctions::DsKllFinalizeSketch(FunctionContext* ctx,
   return dst;
 }
 
+void AggregateFunctions::DsKllInit(FunctionContext* ctx, StringVal* dst) {
+  DsKllInitHelper(ctx, dst);
+}
+
+void AggregateFunctions::DsKllUpdate(FunctionContext* ctx, const FloatVal& src,
+    StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float>* sketch_ptr =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+  sketch_ptr->update(src.val);
+}
+
+StringVal AggregateFunctions::DsKllSerialize(FunctionContext* ctx,
+    const StringVal& src) {
+  return DsKllSerializeHelper(ctx, src);
+}
+
+void AggregateFunctions::DsKllMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  DsKllMergeHelper(ctx, src, dst);
+}
+
+StringVal AggregateFunctions::DsKllFinalizeSketch(FunctionContext* ctx,
+    const StringVal& src) {
+  return DsKllFinalizeHelper(ctx, src);
+}
+
+void AggregateFunctions::DsKllUnionInit(FunctionContext* ctx, StringVal* slot) {
+  // Note, comparing to HLL Union with hll_union type, for KLL Union there is no such
+  // type as kll_union. As a result kll_sketch is used here to store intermediate results
+  // of the union operation.
+  DsKllInitHelper(ctx, slot);
+}
+
+void AggregateFunctions::DsKllUnionUpdate(FunctionContext* ctx, const StringVal& src,
+    StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::kll_sketch<float>));
+  datasketches::kll_sketch<float> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
+  datasketches::kll_sketch<float>* dst_sketch =
+      reinterpret_cast<datasketches::kll_sketch<float>*>(dst->ptr);
+  try {
+    dst_sketch->merge(src_sketch);
+  } catch (const std::exception& e) {
+    ctx->SetError(Substitute("Error while merging DataSketches KLL sketches. "
+        "Message: $0", e.what()).c_str());
+    return;
+  }
+}
+
+StringVal AggregateFunctions::DsKllUnionSerialize(FunctionContext* ctx,
+    const StringVal& src) {
+  return DsKllSerializeHelper(ctx, src);
+}
+
+void AggregateFunctions::DsKllUnionMerge(FunctionContext* ctx, const StringVal& src,
+    StringVal* dst) {
+  DsKllMergeHelper(ctx, src, dst);
+}
+
+StringVal AggregateFunctions::DsKllUnionFinalize(FunctionContext* ctx,
+    const StringVal& src) {
+  return DsKllFinalizeHelper(ctx, src);
+}
+
 /// Intermediate aggregation state for the SampledNdv() function.
 /// Stores NUM_HLL_BUCKETS of the form <row_count, hll_state>.
 /// The 'row_count' keeps track of how many input rows were aggregated into that
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index 487c451..9c7cdaf 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -257,6 +257,21 @@ class AggregateFunctions {
   static void DsKllMerge(FunctionContext*, const StringVal& src, StringVal* dst);
   static StringVal DsKllFinalizeSketch(FunctionContext*, const StringVal& src);
 
+  // Helper functions to keep common code for DataSketches KLL sketch and union
+  // operations.
+  static void DsKllInitHelper(FunctionContext* ctx, StringVal* slot);
+  static StringVal DsKllSerializeHelper(FunctionContext* ctx, const StringVal& src);
+  static void DsKllMergeHelper(FunctionContext* ctx, const StringVal& src,
+      StringVal* dst);
+  static StringVal DsKllFinalizeHelper(FunctionContext*, const StringVal& src);
+
+  /// These functions implement ds_kll_union().
+  static void DsKllUnionInit(FunctionContext*, StringVal* slot);
+  static void DsKllUnionUpdate(FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsKllUnionSerialize(FunctionContext*, const StringVal& src);
+  static void DsKllUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsKllUnionFinalize(FunctionContext*, const StringVal& src);
+
   /// Estimates the number of distinct values (NDV) based on a sample of data and the
   /// corresponding sampling rate. The main idea of this function is to collect several
   /// (x,y) data points where x is the number of rows and y is the corresponding NDV
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 5969549..9094a31 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1323,6 +1323,7 @@ public class BuiltinsDb extends Db {
         prefix + "10CountMergeEPN10impala_udf15FunctionContextERKNS1_9BigIntValEPS4_",
         null, null));
 
+    // DataSketches HLL sketch
     db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_kll_sketch",
         Lists.<Type>newArrayList(Type.FLOAT), Type.STRING, Type.STRING,
         prefix + "9DsKllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
@@ -1333,6 +1334,20 @@ public class BuiltinsDb extends Db {
         prefix + "19DsKllFinalizeSketchEPN10impala_udf15FunctionContextERKNS1_" +
             "9StringValE", true, false, true));
 
+    // DataSketches KLL union
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_kll_union",
+        Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
+        prefix + "14DsKllUnionInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+            "16DsKllUnionUpdateEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "15DsKllUnionMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "19DsKllUnionSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+            "18DsKllUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        true, false, true));
+
     // The following 3 functions are never directly executed because they get rewritten
     db.addBuiltin(AggregateFunction.createAnalyticBuiltin(
         db, "percent_rank", Lists.<Type>newArrayList(), Type.DOUBLE, Type.STRING));
diff --git a/testdata/data/README b/testdata/data/README
index 41ddeac..18c4ce9 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -519,6 +519,10 @@ DataSketches KLL sketches created by Hive. Each column is for a different purpos
   - 'some_nans': Floats with some NaN values.
   - 'all_nans': All values are NaNs.
 
+kll_sketches_from_impala.parquet:
+This holds the same sketches as kll_sketches_from_hive.parquet but these sketches were
+created by Impala instead of Hive.
+
 hudi_parquet:
 IMPALA-8778: Support read Apache Hudi tables
 Hudi parquet is a special format of parquet files managed by Apache Hudi
diff --git a/testdata/data/kll_sketches_from_impala.parquet b/testdata/data/kll_sketches_from_impala.parquet
new file mode 100644
index 0000000..a3b848a
Binary files /dev/null and b/testdata/data/kll_sketches_from_impala.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
index 3986caa..abe3426 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-kll.test
@@ -107,27 +107,29 @@ STRING
 # Note, the plan is to write sketches as binary instead of strings. For this we have to
 # wait for the binary support (IMPALA-9482).
 create table sketch_store
-    (year int, month int, float_sketch string)
+    (year int, month int, id_sketch string, float_sketch string)
 stored as parquet;
 insert into sketch_store
     select
         year,
         month,
+        ds_kll_sketch(id),
         ds_kll_sketch(float_col)
     from functional_parquet.alltypessmall
     group by year, month;
 select
     year,
     month,
+    ds_kll_quantile(id_sketch, 0.5),
     ds_kll_quantile(float_sketch, 0.5)
 from sketch_store;
 ---- RESULTS
-2009,1,4.400000095367432
-2009,2,4.400000095367432
-2009,3,4.400000095367432
-2009,4,4.400000095367432
+2009,1,12,4.400000095367432
+2009,2,37,4.400000095367432
+2009,3,62,4.400000095367432
+2009,4,87,4.400000095367432
 ---- TYPES
-INT,INT,FLOAT
+INT,INT,FLOAT,FLOAT
 ====
 ---- QUERY
 # Check that sketches made by Hive can be read and used for estimating by Impala.
@@ -232,3 +234,57 @@ from kll_sketches_from_hive;
 ---- TYPES
 DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE
 ====
+---- QUERY
+# Unions the sketches from sketch_store and checks if the union produces the same result
+# as if the whole data was sketched together into a single sketch.
+select
+    ds_kll_quantile(ds_kll_union(id_sketch), 0.5),
+    ds_kll_quantile(ds_kll_union(float_sketch), 0.5)
+from sketch_store;
+---- TYPES
+FLOAT,FLOAT
+---- RESULTS
+50,4.400000095367432
+====
+---- QUERY
+# Checks that ds_kll_union() produces NULL for an empty dataset.
+select ds_kll_union(field) from functional_parquet.emptytable;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Checks that ds_kll_union() produces NULL for NULL inputs.
+select ds_kll_union(null_str) from functional_parquet.nullrows;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# ds_kll_union() returns an error if it receives an invalid serialized sketch.
+select ds_kll_union(date_string_col) from functional_parquet.alltypestiny where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Get the same sketches from Impala and Hive and put them into the same table. When we
+# get the estimates from the unions of these sketches the expectation is to get the same
+# results as if these sketches were used separately to get the estimates.
+create table kll_sketches_impala_hive like kll_sketches_from_impala stored as parquet;
+insert into kll_sketches_impala_hive select * from kll_sketches_from_hive;
+insert into kll_sketches_impala_hive select * from kll_sketches_from_impala;
+select
+    ds_kll_quantile(ds_kll_union(f), 0.5),
+    ds_kll_quantile(ds_kll_union(repetitions), 0.5),
+    ds_kll_quantile(ds_kll_union(some_nulls), 0.5),
+    ds_kll_quantile(ds_kll_union(all_nulls), 0.5),
+    ds_kll_quantile(ds_kll_union(some_nans), 0.5),
+    ds_kll_quantile(ds_kll_union(all_nans), 0.5)
+from kll_sketches_impala_hive;
+---- TYPES
+FLOAT,FLOAT,FLOAT,FLOAT,FLOAT,FLOAT
+---- RESULTS
+100.1999969482422,25000.099609375,50.90000152587891,NULL,50.5,NULL
+====
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index 1634387..a0f655e 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -39,4 +39,5 @@ class TestDatasketches(ImpalaTestSuite):
 
   def test_kll(self, vector, unique_database):
     create_table_from_parquet(self.client, unique_database, 'kll_sketches_from_hive')
+    create_table_from_parquet(self.client, unique_database, 'kll_sketches_from_impala')
     self.run_test_case('QueryTest/datasketches-kll', vector, unique_database)


[impala] 01/02: IMPALA-10039: Fixed Expr-test crash caused by thread unsafe function

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 99443559bd701275ab53c7b7daa5ce1d144f0d49
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Aug 5 22:00:36 2020 -0700

    IMPALA-10039: Fixed Expr-test crash caused by thread unsafe function
    
    Recent patch for IMPALA-5746 registers a callback function for the
    updating of cluster membership. The callback function cancels the
    queries scheduled by the failed coordinators. This callback function
    was called during Expr-test and caused crash.
    This patch checks if the process running for tests and only registers
    the callback function if it's not running for BE/FE tests.
    
    Testing:
     - The issue could be reproduced by running expr-test for 10-20
       iterations. Verified the fixing by running expr-test over 1000
       iterations without crash.
     - Passed TestProcessFailures::test_kill_coordinator.
     - Passed core tests.
    
    Change-Id: I85245bf4bffb469913d53741847e67773b7d4627
    Reviewed-on: http://gerrit.cloudera.org:8080/16299
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 063622c..9c1e15b 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -574,7 +574,7 @@ void ExecEnv::SetImpalaServer(ImpalaServer* server) {
           server->CancelQueriesOnFailedBackends(current_backend_set);
         });
   }
-  if (FLAGS_is_executor) {
+  if (FLAGS_is_executor && !TestInfo::is_test()) {
     cluster_membership_mgr_->RegisterUpdateCallbackFn(
         [](ClusterMembershipMgr::SnapshotPtr snapshot) {
           std::unordered_set<BackendIdPB> current_backend_set;