You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/05/14 16:53:13 UTC

[impala] branch master updated (c8adc9c -> 4697db0)

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c8adc9c  Bump up GBN to 13088653
     new b1326f7  IMPALA-10687: Implement ds_cpc_union() function
     new 4697db0  IMPALA-5121: Fix AVG() on timestamp col with use_local_tz_for_unix_timestamp_conversions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exprs/aggregate-functions-ir.cc             |  93 +++++++++++++++++++--
 be/src/exprs/aggregate-functions.h                 |   7 ++
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  14 ++++
 testdata/data/README                               |   4 +
 testdata/data/cpc_sketches_from_impala.parquet     | Bin 0 -> 3420 bytes
 .../queries/QueryTest/datasketches-cpc.test        |  73 ++++++++++++++--
 .../queries/QueryTest/utc-timestamp-functions.test |  12 +++
 tests/query_test/test_datasketches.py              |   1 +
 8 files changed, 192 insertions(+), 12 deletions(-)
 create mode 100644 testdata/data/cpc_sketches_from_impala.parquet

[impala] 01/02: IMPALA-10687: Implement ds_cpc_union() function

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b1326f7eff62dd521989132d650fa8b53057085b
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Tue Apr 13 18:50:37 2021 +0800

    IMPALA-10687: Implement ds_cpc_union() function
    
    This function receives a set of serialized Apache DataSketches CPC
    sketches produced by ds_cpc_sketch() and merges them into a single
    sketch.
    
    An example usage is to create a sketch for each partition of a table,
    write these sketches to a separate table and based on which partition
    the user is interested of the relevant sketches can be union-ed
    together to get an estimate. E.g.:
      SELECT
          ds_cpc_estimate(ds_cpc_union(sketch_col))
      FROM sketch_tbl
      WHERE partition_col=1 OR partition_col=5;
    
    Testing:
      - Apart from the automated tests I added to this patch I also
        tested ds_cpc_union() on a bigger dataset to check that
        serialization, deserialization and merging steps work well. I
        took TPCH25.linelitem, created a number of sketches with grouping
        by l_shipdate and called ds_cpc_union() on those sketches
    
    Change-Id: Ib94b45ae79efcc11adc077dd9df9b9868ae82cb6
    Reviewed-on: http://gerrit.cloudera.org:8080/17372
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             |  84 +++++++++++++++++++++
 be/src/exprs/aggregate-functions.h                 |   7 ++
 .../java/org/apache/impala/catalog/BuiltinsDb.java |  14 ++++
 testdata/data/README                               |   4 +
 testdata/data/cpc_sketches_from_impala.parquet     | Bin 0 -> 3420 bytes
 .../queries/QueryTest/datasketches-cpc.test        |  73 ++++++++++++++++--
 tests/query_test/test_datasketches.py              |   1 +
 7 files changed, 177 insertions(+), 6 deletions(-)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 774a64d..9048df8 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1653,6 +1653,16 @@ StringVal SerializeDsCpcSketch(
   return StringStreamToStringVal(ctx, serialized_input);
 }
 
+/// Auxiliary function that receives a cpc_union, gets the underlying CPC sketch from the
+/// union object and returns the serialized, CPC sketch wrapped into StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches CPC
+/// functionality into the header.
+StringVal SerializeDsCpcUnion(
+    FunctionContext* ctx, const datasketches::cpc_union& ds_union) {
+  datasketches::cpc_sketch sketch = ds_union.get_result();
+  return SerializeDsCpcSketch(ctx, sketch);
+}
+
 /// Auxiliary function that receives a theta_sketch and returns the serialized version of
 /// it wrapped into a StringVal.
 /// Introducing this function in the .cc to avoid including the whole DataSketches Theta
@@ -1956,6 +1966,80 @@ StringVal AggregateFunctions::DsCpcFinalizeSketch(
   return result_str;
 }
 
+void AggregateFunctions::DsCpcUnionInit(FunctionContext* ctx, StringVal* slot) {
+  AllocBuffer(ctx, slot, sizeof(datasketches::cpc_union));
+  if (UNLIKELY(slot->is_null)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return;
+  }
+  datasketches::cpc_union* union_ptr =
+      reinterpret_cast<datasketches::cpc_union*>(slot->ptr);
+  *union_ptr = datasketches::cpc_union(DS_SKETCH_CONFIG);
+}
+
+void AggregateFunctions::DsCpcUnionUpdate(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::cpc_union));
+  // These parameters might be overwritten by DeserializeDsSketch() to use the settings
+  // from the deserialized sketch from 'src'.
+  datasketches::cpc_sketch src_sketch(DS_CPC_SKETCH_CONFIG);
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
+  datasketches::cpc_union* union_ptr =
+      reinterpret_cast<datasketches::cpc_union*>(dst->ptr);
+  union_ptr->update(src_sketch);
+}
+
+StringVal AggregateFunctions::DsCpcUnionSerialize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::cpc_union));
+  datasketches::cpc_union* union_ptr =
+      reinterpret_cast<datasketches::cpc_union*>(src.ptr);
+  StringVal dst = SerializeDsCpcUnion(ctx, *union_ptr);
+  ctx->Free(src.ptr);
+  return dst;
+}
+
+void AggregateFunctions::DsCpcUnionMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  DCHECK(!src.is_null);
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::cpc_union));
+
+  // Note, 'src' is a serialized Cpc_sketch and not a serialized Cpc_union.
+  datasketches::cpc_sketch src_sketch(DS_CPC_SKETCH_CONFIG);
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
+
+  datasketches::cpc_union* dst_union_ptr =
+      reinterpret_cast<datasketches::cpc_union*>(dst->ptr);
+
+  dst_union_ptr->update(src_sketch);
+}
+
+StringVal AggregateFunctions::DsCpcUnionFinalize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::cpc_union));
+  datasketches::cpc_union* union_ptr =
+      reinterpret_cast<datasketches::cpc_union*>(src.ptr);
+  datasketches::cpc_sketch sketch = union_ptr->get_result();
+  if (sketch.get_estimate() == 0.0) {
+    ctx->Free(src.ptr);
+    return StringVal::null();
+  }
+  StringVal result = SerializeDsCpcSketch(ctx, sketch);
+  ctx->Free(src.ptr);
+  return result;
+}
+
 void AggregateFunctions::DsThetaInit(FunctionContext* ctx, StringVal* dst) {
   AllocBuffer(ctx, dst, sizeof(datasketches::update_theta_sketch));
   if (UNLIKELY(dst->is_null)) {
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index ae95439..cec5b8e 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -259,6 +259,13 @@ class AggregateFunctions {
   static BigIntVal DsCpcFinalize(FunctionContext*, const StringVal& src);
   static StringVal DsCpcFinalizeSketch(FunctionContext*, const StringVal& src);
 
+  /// These functions implement ds_cpc_union().
+  static void DsCpcUnionInit(FunctionContext*, StringVal* dst);
+  static void DsCpcUnionUpdate(FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsCpcUnionSerialize(FunctionContext*, const StringVal& src);
+  static void DsCpcUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsCpcUnionFinalize(FunctionContext*, const StringVal& src);
+
   /// These functions implement Apache DataSketches Theta support for sketching.
   static void DsThetaInit(FunctionContext*, StringVal* slot);
   template <typename T>
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 38576ad..356cf54 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1463,6 +1463,20 @@ public class BuiltinsDb extends Db {
             "18DsHllUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
         true, false, true));
 
+    // DataSketches CPC union
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_cpc_union",
+        Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
+        prefix + "14DsCpcUnionInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+            "16DsCpcUnionUpdateEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "15DsCpcUnionMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "19DsCpcUnionSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+            "18DsCpcUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        true, false, true));
+
     // DataSketches Theta union
     db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_union",
         Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
diff --git a/testdata/data/README b/testdata/data/README
index 74e5a4b..3325e65 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -515,6 +515,10 @@ DataSketches CPC sketches created by Hive. Each column contains a sketch for a
 specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
 STRING, CHAR and VARCHAR. Has an additional column for NULL values.
 
+cpc_sketches_from_impala.parquet:
+This holds the same sketches as cpc_sketches_from_hive.parquet but these sketches were
+created by Impala instead of Hive.
+
 theta_sketches_from_hive.parquet:
 This file contains a table that has some string columns to store serialized Apache
 DataSketches Theta sketches created by Hive. Each column contains a sketch for a
diff --git a/testdata/data/cpc_sketches_from_impala.parquet b/testdata/data/cpc_sketches_from_impala.parquet
new file mode 100644
index 0000000..77618d7
Binary files /dev/null and b/testdata/data/cpc_sketches_from_impala.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
index 2f6db47..b05eb58 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
@@ -10,12 +10,13 @@ select
     ds_cpc_estimate(ds_cpc_sketch(bigint_col)),
     ds_cpc_estimate(ds_cpc_sketch(float_col)),
     ds_cpc_estimate(ds_cpc_sketch(double_col)),
-    ds_cpc_estimate(ds_cpc_sketch(string_col))
+    ds_cpc_estimate(ds_cpc_sketch(string_col)),
+    ds_cpc_estimate(ds_cpc_sketch(date_string_col))
 from functional_parquet.alltypessmall
 ---- RESULTS
-10,10,10,10,10,10
+10,10,10,10,10,10,12
 ---- TYPES
-BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
 ====
 ---- QUERY
 select
@@ -24,12 +25,13 @@ select
     ds_cpc_sketch_and_estimate(bigint_col),
     ds_cpc_sketch_and_estimate(float_col),
     ds_cpc_sketch_and_estimate(double_col),
-    ds_cpc_sketch_and_estimate(string_col)
+    ds_cpc_sketch_and_estimate(string_col),
+    ds_cpc_sketch_and_estimate(date_string_col)
 from functional_parquet.alltypessmall
 ---- RESULTS
-10,10,10,10,10,10
+10,10,10,10,10,10,12
 ---- TYPES
-BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
 ====
 ---- QUERY
 # Check that unsupported types give error with ds_cpc_sketch().
@@ -185,3 +187,62 @@ from empty_string
 ---- TYPES
 BIGINT,BIGINT,BIGINT
 ====
+---- QUERY
+# Unions the sketches from cpc_sketch_store and checks if the union produces the same
+# result as if the whole data was sketched together into a single sketch.
+select
+    ds_cpc_estimate(ds_cpc_union(date_sketch)),
+    ds_cpc_estimate(ds_cpc_union(float_sketch))
+from cpc_sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+12,10
+====
+---- QUERY
+# Checks that ds_cpc_union() produces NULL for an empty dataset.
+select ds_cpc_union(field) from functional_parquet.emptytable;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Checks that ds_cpc_union() produces NULL for NULL inputs.
+select ds_cpc_union(null_str) from functional_parquet.nullrows;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# ds_cpc_union() returns an error if it receives an invalid serialized sketch.
+select ds_cpc_union(date_string_col) from functional_parquet.alltypestiny where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Get the same sketches from Impala and Hive and put them into the same table. When we
+# get the estimates from the unions of these sketches the expectation is to get the same
+# results as if these sketches were used separately to get the estimates. However, for
+# string types (STRING, CHAR, VARCHAR) we see the numbers doubling up because of the
+# difference between how Impala and Hive uses these types. Like IMPALA-9939.
+create table cpc_sketches_impala_hive like cpc_sketches_from_impala stored as parquet;
+insert into cpc_sketches_impala_hive select * from cpc_sketches_from_hive;
+insert into cpc_sketches_impala_hive select * from cpc_sketches_from_impala;
+select
+    ds_cpc_estimate(ds_cpc_union(ti)) as ti,
+    ds_cpc_estimate(ds_cpc_union(i)) as i,
+    ds_cpc_estimate(ds_cpc_union(bi)) as bi,
+    ds_cpc_estimate(ds_cpc_union(f)) as f,
+    ds_cpc_estimate(ds_cpc_union(d)) as d,
+    ds_cpc_estimate(ds_cpc_union(s)) as s,
+    ds_cpc_estimate(ds_cpc_union(c)) as c,
+    ds_cpc_estimate(ds_cpc_union(v)) as v,
+    ds_cpc_estimate(ds_cpc_union(nc)) as nc
+from cpc_sketches_impala_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,8,8,6,NULL
+====
\ No newline at end of file
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index 117045f..89e2da2 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -39,6 +39,7 @@ class TestDatasketches(ImpalaTestSuite):
 
   def test_cpc(self, vector, unique_database):
     create_table_from_parquet(self.client, unique_database, 'cpc_sketches_from_hive')
+    create_table_from_parquet(self.client, unique_database, 'cpc_sketches_from_impala')
     self.run_test_case('QueryTest/datasketches-cpc', vector, unique_database)
 
   def test_theta(self, vector, unique_database):

[impala] 02/02: IMPALA-5121: Fix AVG() on timestamp col with use_local_tz_for_unix_timestamp_conversions

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4697db021411e95165cfd199e1aa3fc848dbfc8b
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Sat May 8 08:15:30 2021 +0200

    IMPALA-5121: Fix AVG() on timestamp col with use_local_tz_for_unix_timestamp_conversions
    
    AVG used to contain a back and forth timezone conversion if
    use_local_tz_for_unix_timestamp_conversions is true. This could
    affect the results if there were values from different DST rules.
    
    Note that AVG on timestamps has other issues besides this, see
    IMPALA-7472 for details.
    
    Testing:
    - added a regression test
    
    Change-Id: I999099de8e07269b96b75d473f5753be4479cecd
    Reviewed-on: http://gerrit.cloudera.org:8080/17412
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc                       |  9 +++------
 .../queries/QueryTest/utc-timestamp-functions.test           | 12 ++++++++++++
 2 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 9048df8..a5833ae 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -354,8 +354,7 @@ void AggregateFunctions::TimestampAvgUpdate(FunctionContext* ctx,
   AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
   const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
   double val;
-  const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
-  if (tm_src.ToSubsecondUnixTime(tz, &val)) {
+  if (tm_src.ToSubsecondUnixTime(UTCPTR, &val)) {
     avg->sum += val;
     ++avg->count;
   }
@@ -369,8 +368,7 @@ void AggregateFunctions::TimestampAvgRemove(FunctionContext* ctx,
   AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
   const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
   double val;
-  const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
-  if (tm_src.ToSubsecondUnixTime(tz, &val)) {
+  if (tm_src.ToSubsecondUnixTime(UTCPTR, &val)) {
     avg->sum -= val;
     --avg->count;
     DCHECK_GE(avg->count, 0);
@@ -381,9 +379,8 @@ TimestampVal AggregateFunctions::TimestampAvgGetValue(FunctionContext* ctx,
     const StringVal& src) {
   AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
   if (val_struct->count == 0) return TimestampVal::null();
-  const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
   const TimestampValue& tv = TimestampValue::FromSubsecondUnixTime(
-      val_struct->sum / val_struct->count, tz);
+      val_struct->sum / val_struct->count, UTCPTR);
   if (tv.HasDate()) {
     TimestampVal result;
     tv.ToTimestampVal(&result);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
index a74ef68..1e60a56 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
@@ -100,3 +100,15 @@ TIMESTAMP
 ---- RESULTS
 1969-12-31 16:00:00
 ====
+---- QUERY
+# Regression test for IMPALA-5121. AVG used to contain a back and forth timezone
+# conversion (if use_local_tz_for_unix_timestamp_conversions is true) that could affect
+# the results if there were values from different DST rules.
+SET timezone=CET;
+SET use_local_tz_for_unix_timestamp_conversions=1;
+select avg(timestamp_col) from functional.alltypestiny;
+---- TYPES
+TIMESTAMP
+---- RESULTS
+2009-02-15 00:00:30
+====
\ No newline at end of file