You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2021/05/14 16:53:13 UTC
[impala] branch master updated (c8adc9c -> 4697db0)
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.
from c8adc9c Bump up GBN to 13088653
new b1326f7 IMPALA-10687: Implement ds_cpc_union() function
new 4697db0 IMPALA-5121: Fix AVG() on timestamp col with use_local_tz_for_unix_timestamp_conversions
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
be/src/exprs/aggregate-functions-ir.cc | 93 +++++++++++++++++++--
be/src/exprs/aggregate-functions.h | 7 ++
.../java/org/apache/impala/catalog/BuiltinsDb.java | 14 ++++
testdata/data/README | 4 +
testdata/data/cpc_sketches_from_impala.parquet | Bin 0 -> 3420 bytes
.../queries/QueryTest/datasketches-cpc.test | 73 ++++++++++++++--
.../queries/QueryTest/utc-timestamp-functions.test | 12 +++
tests/query_test/test_datasketches.py | 1 +
8 files changed, 192 insertions(+), 12 deletions(-)
create mode 100644 testdata/data/cpc_sketches_from_impala.parquet
[impala] 01/02: IMPALA-10687: Implement ds_cpc_union() function
Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b1326f7eff62dd521989132d650fa8b53057085b
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Tue Apr 13 18:50:37 2021 +0800
IMPALA-10687: Implement ds_cpc_union() function
This function receives a set of serialized Apache DataSketches CPC
sketches produced by ds_cpc_sketch() and merges them into a single
sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and based on which partition
the user is interested of the relevant sketches can be union-ed
together to get an estimate. E.g.:
SELECT
ds_cpc_estimate(ds_cpc_union(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_cpc_union() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_cpc_union() on those sketches
Change-Id: Ib94b45ae79efcc11adc077dd9df9b9868ae82cb6
Reviewed-on: http://gerrit.cloudera.org:8080/17372
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 84 +++++++++++++++++++++
be/src/exprs/aggregate-functions.h | 7 ++
.../java/org/apache/impala/catalog/BuiltinsDb.java | 14 ++++
testdata/data/README | 4 +
testdata/data/cpc_sketches_from_impala.parquet | Bin 0 -> 3420 bytes
.../queries/QueryTest/datasketches-cpc.test | 73 ++++++++++++++++--
tests/query_test/test_datasketches.py | 1 +
7 files changed, 177 insertions(+), 6 deletions(-)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 774a64d..9048df8 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1653,6 +1653,16 @@ StringVal SerializeDsCpcSketch(
return StringStreamToStringVal(ctx, serialized_input);
}
+/// Auxiliary function that receives a cpc_union, gets the underlying CPC sketch from the
+/// union object and returns the serialized, CPC sketch wrapped into StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches CPC
+/// functionality into the header.
+StringVal SerializeDsCpcUnion(
+ FunctionContext* ctx, const datasketches::cpc_union& ds_union) {
+ datasketches::cpc_sketch sketch = ds_union.get_result();
+ return SerializeDsCpcSketch(ctx, sketch);
+}
+
/// Auxiliary function that receives a theta_sketch and returns the serialized version of
/// it wrapped into a StringVal.
/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
@@ -1956,6 +1966,80 @@ StringVal AggregateFunctions::DsCpcFinalizeSketch(
return result_str;
}
+void AggregateFunctions::DsCpcUnionInit(FunctionContext* ctx, StringVal* slot) {
+ AllocBuffer(ctx, slot, sizeof(datasketches::cpc_union));
+ if (UNLIKELY(slot->is_null)) {
+ DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+ return;
+ }
+ datasketches::cpc_union* union_ptr =
+ reinterpret_cast<datasketches::cpc_union*>(slot->ptr);
+ *union_ptr = datasketches::cpc_union(DS_SKETCH_CONFIG);
+}
+
+void AggregateFunctions::DsCpcUnionUpdate(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ if (src.is_null) return;
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::cpc_union));
+ // These parameters might be overwritten by DeserializeDsSketch() to use the settings
+ // from the deserialized sketch from 'src'.
+ datasketches::cpc_sketch src_sketch(DS_CPC_SKETCH_CONFIG);
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
+ datasketches::cpc_union* union_ptr =
+ reinterpret_cast<datasketches::cpc_union*>(dst->ptr);
+ union_ptr->update(src_sketch);
+}
+
+StringVal AggregateFunctions::DsCpcUnionSerialize(
+ FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::cpc_union));
+ datasketches::cpc_union* union_ptr =
+ reinterpret_cast<datasketches::cpc_union*>(src.ptr);
+ StringVal dst = SerializeDsCpcUnion(ctx, *union_ptr);
+ ctx->Free(src.ptr);
+ return dst;
+}
+
+void AggregateFunctions::DsCpcUnionMerge(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ DCHECK(!src.is_null);
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::cpc_union));
+
+ // Note, 'src' is a serialized Cpc_sketch and not a serialized Cpc_union.
+ datasketches::cpc_sketch src_sketch(DS_CPC_SKETCH_CONFIG);
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
+
+ datasketches::cpc_union* dst_union_ptr =
+ reinterpret_cast<datasketches::cpc_union*>(dst->ptr);
+
+ dst_union_ptr->update(src_sketch);
+}
+
+StringVal AggregateFunctions::DsCpcUnionFinalize(
+ FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::cpc_union));
+ datasketches::cpc_union* union_ptr =
+ reinterpret_cast<datasketches::cpc_union*>(src.ptr);
+ datasketches::cpc_sketch sketch = union_ptr->get_result();
+ if (sketch.get_estimate() == 0.0) {
+ ctx->Free(src.ptr);
+ return StringVal::null();
+ }
+ StringVal result = SerializeDsCpcSketch(ctx, sketch);
+ ctx->Free(src.ptr);
+ return result;
+}
+
void AggregateFunctions::DsThetaInit(FunctionContext* ctx, StringVal* dst) {
AllocBuffer(ctx, dst, sizeof(datasketches::update_theta_sketch));
if (UNLIKELY(dst->is_null)) {
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index ae95439..cec5b8e 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -259,6 +259,13 @@ class AggregateFunctions {
static BigIntVal DsCpcFinalize(FunctionContext*, const StringVal& src);
static StringVal DsCpcFinalizeSketch(FunctionContext*, const StringVal& src);
+ /// These functions implement ds_cpc_union().
+ static void DsCpcUnionInit(FunctionContext*, StringVal* dst);
+ static void DsCpcUnionUpdate(FunctionContext*, const StringVal& src, StringVal* dst);
+ static StringVal DsCpcUnionSerialize(FunctionContext*, const StringVal& src);
+ static void DsCpcUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+ static StringVal DsCpcUnionFinalize(FunctionContext*, const StringVal& src);
+
/// These functions implement Apache DataSketches Theta support for sketching.
static void DsThetaInit(FunctionContext*, StringVal* slot);
template <typename T>
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 38576ad..356cf54 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1463,6 +1463,20 @@ public class BuiltinsDb extends Db {
"18DsHllUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
+ // DataSketches CPC union
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_cpc_union",
+ Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
+ prefix + "14DsCpcUnionInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix +
+ "16DsCpcUnionUpdateEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix +
+ "15DsCpcUnionMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix +
+ "19DsCpcUnionSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix +
+ "18DsCpcUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ true, false, true));
+
// DataSketches Theta union
db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_union",
Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
diff --git a/testdata/data/README b/testdata/data/README
index 74e5a4b..3325e65 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -515,6 +515,10 @@ DataSketches CPC sketches created by Hive. Each column contains a sketch for a
specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
STRING, CHAR and VARCHAR. Has an additional column for NULL values.
+cpc_sketches_from_impala.parquet:
+This holds the same sketches as cpc_sketches_from_hive.parquet but these sketches were
+created by Impala instead of Hive.
+
theta_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches Theta sketches created by Hive. Each column contains a sketch for a
diff --git a/testdata/data/cpc_sketches_from_impala.parquet b/testdata/data/cpc_sketches_from_impala.parquet
new file mode 100644
index 0000000..77618d7
Binary files /dev/null and b/testdata/data/cpc_sketches_from_impala.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
index 2f6db47..b05eb58 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
@@ -10,12 +10,13 @@ select
ds_cpc_estimate(ds_cpc_sketch(bigint_col)),
ds_cpc_estimate(ds_cpc_sketch(float_col)),
ds_cpc_estimate(ds_cpc_sketch(double_col)),
- ds_cpc_estimate(ds_cpc_sketch(string_col))
+ ds_cpc_estimate(ds_cpc_sketch(string_col)),
+ ds_cpc_estimate(ds_cpc_sketch(date_string_col))
from functional_parquet.alltypessmall
---- RESULTS
-10,10,10,10,10,10
+10,10,10,10,10,10,12
---- TYPES
-BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
====
---- QUERY
select
@@ -24,12 +25,13 @@ select
ds_cpc_sketch_and_estimate(bigint_col),
ds_cpc_sketch_and_estimate(float_col),
ds_cpc_sketch_and_estimate(double_col),
- ds_cpc_sketch_and_estimate(string_col)
+ ds_cpc_sketch_and_estimate(string_col),
+ ds_cpc_sketch_and_estimate(date_string_col)
from functional_parquet.alltypessmall
---- RESULTS
-10,10,10,10,10,10
+10,10,10,10,10,10,12
---- TYPES
-BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
====
---- QUERY
# Check that unsupported types give error with ds_cpc_sketch().
@@ -185,3 +187,62 @@ from empty_string
---- TYPES
BIGINT,BIGINT,BIGINT
====
+---- QUERY
+# Unions the sketches from cpc_sketch_store and checks if the union produces the same
+# result as if the whole data was sketched together into a single sketch.
+select
+ ds_cpc_estimate(ds_cpc_union(date_sketch)),
+ ds_cpc_estimate(ds_cpc_union(float_sketch))
+from cpc_sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+12,10
+====
+---- QUERY
+# Checks that ds_cpc_union() produces NULL for an empty dataset.
+select ds_cpc_union(field) from functional_parquet.emptytable;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Checks that ds_cpc_union() produces NULL for NULL inputs.
+select ds_cpc_union(null_str) from functional_parquet.nullrows;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# ds_cpc_union() returns an error if it receives an invalid serialized sketch.
+select ds_cpc_union(date_string_col) from functional_parquet.alltypestiny where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Get the same sketches from Impala and Hive and put them into the same table. When we
+# get the estimates from the unions of these sketches the expectation is to get the same
+# results as if these sketches were used separately to get the estimates. However, for
+# string types (STRING, CHAR, VARCHAR) we see the numbers doubling up because of the
+# difference between how Impala and Hive uses these types. Like IMPALA-9939.
+create table cpc_sketches_impala_hive like cpc_sketches_from_impala stored as parquet;
+insert into cpc_sketches_impala_hive select * from cpc_sketches_from_hive;
+insert into cpc_sketches_impala_hive select * from cpc_sketches_from_impala;
+select
+ ds_cpc_estimate(ds_cpc_union(ti)) as ti,
+ ds_cpc_estimate(ds_cpc_union(i)) as i,
+ ds_cpc_estimate(ds_cpc_union(bi)) as bi,
+ ds_cpc_estimate(ds_cpc_union(f)) as f,
+ ds_cpc_estimate(ds_cpc_union(d)) as d,
+ ds_cpc_estimate(ds_cpc_union(s)) as s,
+ ds_cpc_estimate(ds_cpc_union(c)) as c,
+ ds_cpc_estimate(ds_cpc_union(v)) as v,
+ ds_cpc_estimate(ds_cpc_union(nc)) as nc
+from cpc_sketches_impala_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,8,8,6,NULL
+====
\ No newline at end of file
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index 117045f..89e2da2 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -39,6 +39,7 @@ class TestDatasketches(ImpalaTestSuite):
def test_cpc(self, vector, unique_database):
create_table_from_parquet(self.client, unique_database, 'cpc_sketches_from_hive')
+ create_table_from_parquet(self.client, unique_database, 'cpc_sketches_from_impala')
self.run_test_case('QueryTest/datasketches-cpc', vector, unique_database)
def test_theta(self, vector, unique_database):
[impala] 02/02: IMPALA-5121: Fix AVG() on timestamp col with
use_local_tz_for_unix_timestamp_conversions
Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4697db021411e95165cfd199e1aa3fc848dbfc8b
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Sat May 8 08:15:30 2021 +0200
IMPALA-5121: Fix AVG() on timestamp col with use_local_tz_for_unix_timestamp_conversions
AVG used to contain a back and forth timezone conversion if
use_local_tz_for_unix_timestamp_conversions is true. This could
affect the results if there were values from different DST rules.
Note that AVG on timestamps has other issues besides this, see
IMPALA-7472 for details.
Testing:
- added a regression test
Change-Id: I999099de8e07269b96b75d473f5753be4479cecd
Reviewed-on: http://gerrit.cloudera.org:8080/17412
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 9 +++------
.../queries/QueryTest/utc-timestamp-functions.test | 12 ++++++++++++
2 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 9048df8..a5833ae 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -354,8 +354,7 @@ void AggregateFunctions::TimestampAvgUpdate(FunctionContext* ctx,
AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
double val;
- const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
- if (tm_src.ToSubsecondUnixTime(tz, &val)) {
+ if (tm_src.ToSubsecondUnixTime(UTCPTR, &val)) {
avg->sum += val;
++avg->count;
}
@@ -369,8 +368,7 @@ void AggregateFunctions::TimestampAvgRemove(FunctionContext* ctx,
AvgState* avg = reinterpret_cast<AvgState*>(dst->ptr);
const TimestampValue& tm_src = TimestampValue::FromTimestampVal(src);
double val;
- const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
- if (tm_src.ToSubsecondUnixTime(tz, &val)) {
+ if (tm_src.ToSubsecondUnixTime(UTCPTR, &val)) {
avg->sum -= val;
--avg->count;
DCHECK_GE(avg->count, 0);
@@ -381,9 +379,8 @@ TimestampVal AggregateFunctions::TimestampAvgGetValue(FunctionContext* ctx,
const StringVal& src) {
AvgState* val_struct = reinterpret_cast<AvgState*>(src.ptr);
if (val_struct->count == 0) return TimestampVal::null();
- const Timezone* tz = ctx->impl()->state()->time_zone_for_unix_time_conversions();
const TimestampValue& tv = TimestampValue::FromSubsecondUnixTime(
- val_struct->sum / val_struct->count, tz);
+ val_struct->sum / val_struct->count, UTCPTR);
if (tv.HasDate()) {
TimestampVal result;
tv.ToTimestampVal(&result);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
index a74ef68..1e60a56 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/utc-timestamp-functions.test
@@ -100,3 +100,15 @@ TIMESTAMP
---- RESULTS
1969-12-31 16:00:00
====
+---- QUERY
+# Regression test for IMPALA-5121. AVG used to contain a back and forth timezone
+# conversion (if use_local_tz_for_unix_timestamp_conversions is true) that could affect
+# the results if there were values from different DST rules.
+SET timezone=CET;
+SET use_local_tz_for_unix_timestamp_conversions=1;
+select avg(timestamp_col) from functional.alltypestiny;
+---- TYPES
+TIMESTAMP
+---- RESULTS
+2009-02-15 00:00:30
+====
\ No newline at end of file