You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/09 16:09:04 UTC
[impala] 01/02: IMPALA-9294: Support DATE for min-max runtime filter
This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 55099517b059eadb064674768488bfd3287f649a
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sat Jun 20 18:14:50 2020 -0700
IMPALA-9294: Support DATE for min-max runtime filter
Implemented Date min-max filter and applied it to Kudu as other
min-max runtime filters.
Added new test cases for Date min-max filters.
Testing:
Passed all core tests.
Change-Id: Ic2f6e2dc6949735d5f0fcf317361cc2969a5e82c
Reviewed-on: http://gerrit.cloudera.org:8080/16103
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/codegen/gen_ir_descriptions.py | 1 +
be/src/runtime/date-value.h | 12 ++
be/src/util/min-max-filter-ir.cc | 32 +++--
be/src/util/min-max-filter-test.cc | 142 ++++++++++++---------
be/src/util/min-max-filter.cc | 125 ++++++++++--------
be/src/util/min-max-filter.h | 61 +++++----
common/protobuf/common.proto | 1 +
common/thrift/Data.thrift | 1 +
.../impala/planner/RuntimeFilterGenerator.java | 2 -
.../PlannerTest/min-max-runtime-filters.test | 51 ++++++++
.../queries/QueryTest/all_runtime_filters.test | 4 +-
.../queries/QueryTest/min_max_filters.test | 30 +++++
12 files changed, 295 insertions(+), 167 deletions(-)
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index b15c88a..fbbca0d 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -230,6 +230,7 @@ ir_functions = [
["DOUBLE_MIN_MAX_FILTER_INSERT", "_ZN6impala18DoubleMinMaxFilter6InsertEPKv"],
["STRING_MIN_MAX_FILTER_INSERT", "_ZN6impala18StringMinMaxFilter6InsertEPKv"],
["TIMESTAMP_MIN_MAX_FILTER_INSERT", "_ZN6impala21TimestampMinMaxFilter6InsertEPKv"],
+ ["DATE_MIN_MAX_FILTER_INSERT", "_ZN6impala16DateMinMaxFilter6InsertEPKv"],
["DECIMAL_MIN_MAX_FILTER_INSERT4", "_ZN6impala19DecimalMinMaxFilter7Insert4EPKv"],
["DECIMAL_MIN_MAX_FILTER_INSERT8", "_ZN6impala19DecimalMinMaxFilter7Insert8EPKv"],
["DECIMAL_MIN_MAX_FILTER_INSERT16", "_ZN6impala19DecimalMinMaxFilter8Insert16EPKv"],
diff --git a/be/src/runtime/date-value.h b/be/src/runtime/date-value.h
index e7c48f5..8351252 100644
--- a/be/src/runtime/date-value.h
+++ b/be/src/runtime/date-value.h
@@ -23,6 +23,7 @@
#include "common/logging.h"
#include "common/status.h"
+#include "gen-cpp/common.pb.h"
#include "udf/udf.h"
namespace impala {
@@ -173,6 +174,17 @@ class DateValue {
return DateValue(udf_value.val);
}
+ // Store the days_since_epoch_ of this DateValue in 'pvalue'.
+ void ToColumnValuePB(ColumnValuePB* pvalue) const {
+ DCHECK(pvalue != nullptr);
+ pvalue->set_date_val(days_since_epoch_);
+ }
+
+ // Returns a new DateValue created from the value in 'value_pb'.
+ static DateValue FromColumnValuePB(const ColumnValuePB& value_pb) {
+ return DateValue(value_pb.date_val());
+ }
+
/// Constructors that parse from a date string. See DateParser for details about the
/// date format.
static DateValue ParseSimpleDateFormat(const char* str, int len, bool accept_time_toks);
diff --git a/be/src/util/min-max-filter-ir.cc b/be/src/util/min-max-filter-ir.cc
index 96cfb36..524fa84 100644
--- a/be/src/util/min-max-filter-ir.cc
+++ b/be/src/util/min-max-filter-ir.cc
@@ -57,21 +57,25 @@ void StringMinMaxFilter::Insert(const void* val) {
}
}
-void TimestampMinMaxFilter::Insert(const void* val) {
- if (val == nullptr) return;
- const TimestampValue* value = reinterpret_cast<const TimestampValue*>(val);
- if (always_false_) {
- min_ = *value;
- max_ = *value;
- always_false_ = false;
- } else {
- if (*value < min_) {
- min_ = *value;
- } else if (*value > max_) {
- max_ = *value;
- }
+#define DATE_TIME_MIN_MAX_FILTER_INSERT(NAME, TYPE) \
+ void NAME##MinMaxFilter::Insert(const void* val) { \
+ if (val == nullptr) return; \
+ const TYPE* value = reinterpret_cast<const TYPE*>(val); \
+ if (always_false_) { \
+ min_ = *value; \
+ max_ = *value; \
+ always_false_ = false; \
+ } else { \
+ if (*value < min_) { \
+ min_ = *value; \
+ } else if (*value > max_) { \
+ max_ = *value; \
+ } \
+ } \
}
-}
+
+DATE_TIME_MIN_MAX_FILTER_INSERT(Timestamp, TimestampValue);
+DATE_TIME_MIN_MAX_FILTER_INSERT(Date, DateValue);
#define INSERT_DECIMAL_MINMAX(SIZE) \
do { \
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index 558b65c..27c4957 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -19,6 +19,7 @@
#include "util/min-max-filter.h"
#include "gen-cpp/data_stream_service.pb.h"
+#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/string-value.inline.h"
@@ -359,73 +360,92 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
always_false->Close();
}
-void CheckTimestampVals(
- MinMaxFilter* filter, const TimestampValue& min, const TimestampValue& max) {
- EXPECT_EQ(*reinterpret_cast<const TimestampValue*>(filter->GetMin()), min);
- EXPECT_EQ(*reinterpret_cast<const TimestampValue*>(filter->GetMax()), max);
- EXPECT_FALSE(filter->AlwaysFalse());
- EXPECT_FALSE(filter->AlwaysTrue());
+static TimestampValue ParseSimpleTimestamp(const char* s) {
+ return TimestampValue::ParseSimpleDateFormat(s);
}
+static DateValue ParseSimpleDate(const char* s) {
+ return DateValue::ParseSimpleDateFormat(s, strlen(s), /* accept_time_toks */ true);
+}
+
+#define DATE_TIME_CHECK_VALS(NAME, TYPE) \
+ void Check##NAME##Vals(MinMaxFilter* filter, const TYPE& min, const TYPE& max) { \
+ EXPECT_EQ(*reinterpret_cast<const TYPE*>(filter->GetMin()), min); \
+ EXPECT_EQ(*reinterpret_cast<const TYPE*>(filter->GetMax()), max); \
+ EXPECT_FALSE(filter->AlwaysFalse()); \
+ EXPECT_FALSE(filter->AlwaysTrue()); \
+ }
+
+DATE_TIME_CHECK_VALS(Timestamp, TimestampValue);
+DATE_TIME_CHECK_VALS(Date, DateValue);
+
+#define DATE_TIME_CHECK_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE) \
+ do { \
+ ObjectPool obj_pool; \
+ MemTracker mem_tracker; \
+ ColumnType col_type(PrimitiveType::TYPE_##PRIMITIVE_TYPE); \
+ MinMaxFilter* filter = MinMaxFilter::Create(col_type, &obj_pool, &mem_tracker); \
+ /* Test the behavior of an empty filter. */ \
+ EXPECT_TRUE(filter->AlwaysFalse()); \
+ EXPECT_FALSE(filter->AlwaysTrue()); \
+ MinMaxFilterPB pFilter; \
+ filter->ToProtobuf(&pFilter); \
+ EXPECT_TRUE(pFilter.always_false()); \
+ EXPECT_FALSE(pFilter.always_true()); \
+ EXPECT_FALSE(pFilter.min().has_##PROTOBUF_TYPE##_val()); \
+ EXPECT_FALSE(pFilter.max().has_##PROTOBUF_TYPE##_val()); \
+ MinMaxFilter* empty_filter = \
+ MinMaxFilter::Create(pFilter, col_type, &obj_pool, &mem_tracker); \
+ EXPECT_TRUE(empty_filter->AlwaysFalse()); \
+ EXPECT_FALSE(empty_filter->AlwaysTrue()); \
+ empty_filter->Close(); \
+ /* Now insert some stuff. */ \
+ TYPE t1 = ParseSimple##NAME("2000-01-01 00:00:00"); \
+ filter->Insert(&t1); \
+ Check##NAME##Vals(filter, t1, t1); \
+ TYPE t2 = ParseSimple##NAME("1990-01-01 12:30:00"); \
+ filter->Insert(&t2); \
+ Check##NAME##Vals(filter, t2, t1); \
+ TYPE t3 = ParseSimple##NAME("2001-04-30 05:00:00"); \
+ filter->Insert(&t3); \
+ Check##NAME##Vals(filter, t2, t3); \
+ TYPE t4 = ParseSimple##NAME("2001-04-30 01:00:00"); \
+ filter->Insert(&t4); \
+ Check##NAME##Vals(filter, t2, t3); \
+ /* Check Protobuf. */ \
+ filter->ToProtobuf(&pFilter); \
+ EXPECT_FALSE(pFilter.always_false()); \
+ EXPECT_FALSE(pFilter.always_true()); \
+ EXPECT_EQ(TYPE::FromColumnValuePB(pFilter.min()), t2); \
+ EXPECT_EQ(TYPE::FromColumnValuePB(pFilter.max()), t3); \
+ MinMaxFilter* filter2 = \
+ MinMaxFilter::Create(pFilter, col_type, &obj_pool, &mem_tracker); \
+ Check##NAME##Vals(filter2, t2, t3); \
+ filter2->Close(); \
+ /* Check the behavior of Or. */ \
+ filter->ToProtobuf(&pFilter); \
+ MinMaxFilterPB pFilter1; \
+ t2.ToColumnValuePB(pFilter1.mutable_min()); \
+ t4.ToColumnValuePB(pFilter1.mutable_max()); \
+ MinMaxFilterPB pFilter2; \
+ t1.ToColumnValuePB(pFilter2.mutable_min()); \
+ t3.ToColumnValuePB(pFilter2.mutable_max()); \
+ MinMaxFilter::Or(pFilter1, &pFilter2, col_type); \
+ EXPECT_EQ(TYPE::FromColumnValuePB(pFilter2.min()), t2); \
+ EXPECT_EQ(TYPE::FromColumnValuePB(pFilter2.max()), t3); \
+ filter->Close(); \
+ } while (false)
+
// Tests that a TimestampMinMaxFilter returns the expected min/max after having values
// inserted into it, and that MinMaxFilter::Or works for timestamps.
TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
- ObjectPool obj_pool;
- MemTracker mem_tracker;
- ColumnType timestamp_type(PrimitiveType::TYPE_TIMESTAMP);
- MinMaxFilter* filter = MinMaxFilter::Create(timestamp_type, &obj_pool, &mem_tracker);
-
- // Test the behavior of an empty filter.
- EXPECT_TRUE(filter->AlwaysFalse());
- EXPECT_FALSE(filter->AlwaysTrue());
- MinMaxFilterPB pFilter;
- filter->ToProtobuf(&pFilter);
- EXPECT_TRUE(pFilter.always_false());
- EXPECT_FALSE(pFilter.always_true());
- EXPECT_FALSE(pFilter.min().has_timestamp_val());
- EXPECT_FALSE(pFilter.max().has_timestamp_val());
- MinMaxFilter* empty_filter =
- MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
- EXPECT_TRUE(empty_filter->AlwaysFalse());
- EXPECT_FALSE(empty_filter->AlwaysTrue());
-
- // Now insert some stuff.
- TimestampValue t1 = TimestampValue::ParseSimpleDateFormat("2000-01-01 00:00:00");
- filter->Insert(&t1);
- CheckTimestampVals(filter, t1, t1);
- TimestampValue t2 = TimestampValue::ParseSimpleDateFormat("1990-01-01 12:30:00");
- filter->Insert(&t2);
- CheckTimestampVals(filter, t2, t1);
- TimestampValue t3 = TimestampValue::ParseSimpleDateFormat("2001-04-30 05:00:00");
- filter->Insert(&t3);
- CheckTimestampVals(filter, t2, t3);
- TimestampValue t4 = TimestampValue::ParseSimpleDateFormat("2001-04-30 01:00:00");
- filter->Insert(&t4);
- CheckTimestampVals(filter, t2, t3);
-
- filter->ToProtobuf(&pFilter);
- EXPECT_FALSE(pFilter.always_false());
- EXPECT_FALSE(pFilter.always_true());
- EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.min()), t2);
- EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.max()), t3);
- MinMaxFilter* filter2 =
- MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
- CheckTimestampVals(filter2, t2, t3);
-
- // Check the behavior of Or.
- MinMaxFilterPB pFilter1;
- t2.ToColumnValuePB(pFilter1.mutable_min());
- t4.ToColumnValuePB(pFilter1.mutable_max());
- MinMaxFilterPB pFilter2;
- t1.ToColumnValuePB(pFilter2.mutable_min());
- t3.ToColumnValuePB(pFilter2.mutable_max());
- MinMaxFilter::Or(pFilter1, &pFilter2, timestamp_type);
- EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.min()), t2);
- EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.max()), t3);
+ DATE_TIME_CHECK_FUNCS(Timestamp, TimestampValue, timestamp, TIMESTAMP);
+}
- filter->Close();
- empty_filter->Close();
- filter2->Close();
+// Tests that a DateMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for dates.
+TEST(MinMaxFilterTest, TestDateMinMaxFilter) {
+ DATE_TIME_CHECK_FUNCS(Date, DateValue, date, DATE);
}
#define DECIMAL_CHECK(SIZE) \
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index dee5d40..187d8da 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -21,6 +21,7 @@
#include <unordered_map>
#include "common/object-pool.h"
+#include "runtime/date-value.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/raw-value.h"
#include "runtime/string-value.inline.h"
@@ -41,6 +42,7 @@ static std::unordered_map<int, string> MIN_MAX_FILTER_LLVM_CLASS_NAMES = {
{PrimitiveType::TYPE_DOUBLE, DoubleMinMaxFilter::LLVM_CLASS_NAME},
{PrimitiveType::TYPE_STRING, StringMinMaxFilter::LLVM_CLASS_NAME},
{PrimitiveType::TYPE_TIMESTAMP, TimestampMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_DATE, DateMinMaxFilter::LLVM_CLASS_NAME},
{PrimitiveType::TYPE_DECIMAL, DecimalMinMaxFilter::LLVM_CLASS_NAME}};
static std::unordered_map<int, IRFunction::Type> MIN_MAX_FILTER_IR_FUNCTION_TYPES = {
@@ -52,7 +54,8 @@ static std::unordered_map<int, IRFunction::Type> MIN_MAX_FILTER_IR_FUNCTION_TYPE
{PrimitiveType::TYPE_FLOAT, IRFunction::FLOAT_MIN_MAX_FILTER_INSERT},
{PrimitiveType::TYPE_DOUBLE, IRFunction::DOUBLE_MIN_MAX_FILTER_INSERT},
{PrimitiveType::TYPE_STRING, IRFunction::STRING_MIN_MAX_FILTER_INSERT},
- {PrimitiveType::TYPE_TIMESTAMP, IRFunction::TIMESTAMP_MIN_MAX_FILTER_INSERT}};
+ {PrimitiveType::TYPE_TIMESTAMP, IRFunction::TIMESTAMP_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_DATE, IRFunction::DATE_MIN_MAX_FILTER_INSERT}};
static std::unordered_map<int, IRFunction::Type>
DECIMAL_MIN_MAX_FILTER_IR_FUNCTION_TYPES = {
@@ -332,63 +335,61 @@ void StringMinMaxFilter::SetAlwaysTrue() {
max_.len = 0;
}
-// TIMESTAMP
-const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
- "class.impala::TimestampMinMaxFilter";
-
-TimestampMinMaxFilter::TimestampMinMaxFilter(const MinMaxFilterPB& protobuf) {
- always_false_ = protobuf.always_false();
- if (!always_false_) {
- DCHECK(protobuf.min().has_timestamp_val());
- DCHECK(protobuf.max().has_timestamp_val());
- min_ = TimestampValue::FromColumnValuePB(protobuf.min());
- max_ = TimestampValue::FromColumnValuePB(protobuf.max());
- }
-}
-
-PrimitiveType TimestampMinMaxFilter::type() {
- return PrimitiveType::TYPE_TIMESTAMP;
-}
-
-void TimestampMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
- if (!always_false_) {
- min_.ToColumnValuePB(protobuf->mutable_min());
- max_.ToColumnValuePB(protobuf->mutable_max());
- }
- protobuf->set_always_false(always_false_);
- protobuf->set_always_true(false);
-}
-
-string TimestampMinMaxFilter::DebugString() const {
- stringstream out;
- out << "TimestampMinMaxFilter(min=" << min_ << ", max=" << max_
- << " always_false=" << (always_false_ ? "true" : "false") << ")";
- return out.str();
-}
-
-void TimestampMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
- if (out->always_false()) {
- out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
- out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
- out->set_always_false(false);
- } else {
- TimestampValue in_min_val = TimestampValue::FromColumnValuePB(in.min());
- TimestampValue out_min_val = TimestampValue::FromColumnValuePB(out->min());
- if (in_min_val < out_min_val) {
- out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
- }
- TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
- TimestampValue out_max_val = TimestampValue::FromColumnValuePB(out->max());
- if (in_max_val > out_max_val) {
- out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
- }
+// TIMESTAMP and DATE
+#define DATE_TIME_MIN_MAX_FILTER_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE) \
+ const char* NAME##MinMaxFilter::LLVM_CLASS_NAME = \
+ "class.impala::" #NAME "MinMaxFilter"; \
+ NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) { \
+ always_false_ = protobuf.always_false(); \
+ if (!always_false_) { \
+ DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val()); \
+ DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val()); \
+ min_ = TYPE::FromColumnValuePB(protobuf.min()); \
+ max_ = TYPE::FromColumnValuePB(protobuf.max()); \
+ } \
+ } \
+ PrimitiveType NAME##MinMaxFilter::type() { \
+ return PrimitiveType::TYPE_##PRIMITIVE_TYPE; \
+ } \
+ void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const { \
+ if (!always_false_) { \
+ min_.ToColumnValuePB(protobuf->mutable_min()); \
+ max_.ToColumnValuePB(protobuf->mutable_max()); \
+ } \
+ protobuf->set_always_false(always_false_); \
+ protobuf->set_always_true(false); \
+ } \
+ string NAME##MinMaxFilter::DebugString() const { \
+ stringstream out; \
+ out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_ \
+ << ", always_false=" << (always_false_ ? "true" : "false") << ")"; \
+ return out.str(); \
+ } \
+ void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) { \
+ if (out->always_false()) { \
+ out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val()); \
+ out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val()); \
+ out->set_always_false(false); \
+ } else { \
+ TYPE in_min_val = TYPE::FromColumnValuePB(in.min()); \
+ TYPE out_min_val = TYPE::FromColumnValuePB(out->min()); \
+ if (in_min_val < out_min_val) { \
+ out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val()); \
+ } \
+ TYPE in_max_val = TYPE::FromColumnValuePB(in.max()); \
+ TYPE out_max_val = TYPE::FromColumnValuePB(out->max()); \
+ if (in_max_val > out_max_val) { \
+ out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val()); \
+ } \
+ } \
+ } \
+ void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) { \
+ out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val()); \
+ out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val()); \
}
-}
-void TimestampMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
- out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
- out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
-}
+DATE_TIME_MIN_MAX_FILTER_FUNCS(Timestamp, TimestampValue, timestamp, TIMESTAMP);
+DATE_TIME_MIN_MAX_FILTER_FUNCS(Date, DateValue, date, DATE);
// DECIMAL
const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxFilter";
@@ -568,6 +569,8 @@ MinMaxFilter* MinMaxFilter::Create(
return pool->Add(new StringMinMaxFilter(mem_tracker));
case PrimitiveType::TYPE_TIMESTAMP:
return pool->Add(new TimestampMinMaxFilter());
+ case PrimitiveType::TYPE_DATE:
+ return pool->Add(new DateMinMaxFilter());
case PrimitiveType::TYPE_DECIMAL:
return pool->Add(new DecimalMinMaxFilter(type.precision));
default:
@@ -597,6 +600,8 @@ MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType ty
return pool->Add(new StringMinMaxFilter(protobuf, mem_tracker));
case PrimitiveType::TYPE_TIMESTAMP:
return pool->Add(new TimestampMinMaxFilter(protobuf));
+ case PrimitiveType::TYPE_DATE:
+ return pool->Add(new DateMinMaxFilter(protobuf));
case PrimitiveType::TYPE_DECIMAL:
return pool->Add(new DecimalMinMaxFilter(protobuf, type.precision));
default:
@@ -657,6 +662,10 @@ void MinMaxFilter::Or(
DCHECK(out->min().has_timestamp_val());
TimestampMinMaxFilter::Or(in, out);
return;
+ } else if (in.min().has_date_val()) {
+ DCHECK(out->min().has_date_val());
+ DateMinMaxFilter::Or(in, out);
+ return;
} else if (in.min().has_decimal_val()) {
DCHECK(out->min().has_decimal_val());
DecimalMinMaxFilter::Or(in, out, columnType.precision);
@@ -703,6 +712,10 @@ void MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
DCHECK(!out->min().has_timestamp_val());
TimestampMinMaxFilter::Copy(in, out);
return;
+ } else if (in.min().has_date_val()) {
+ DCHECK(!out->min().has_date_val());
+ DateMinMaxFilter::Copy(in, out);
+ return;
} else if (in.min().has_decimal_val()) {
DCHECK(!out->min().has_decimal_val());
DecimalMinMaxFilter::Copy(in, out);
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 00a2837..057c6a6 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -20,6 +20,7 @@
#include "gen-cpp/ImpalaInternalService_types.h"
#include "impala-ir/impala-ir-functions.h"
+#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
#include "runtime/string-buffer.h"
#include "runtime/string-value.h"
@@ -213,38 +214,36 @@ class StringMinMaxFilter : public MinMaxFilter {
bool always_true_;
};
-class TimestampMinMaxFilter : public MinMaxFilter {
- public:
- TimestampMinMaxFilter() { always_false_ = true; }
- TimestampMinMaxFilter(const MinMaxFilterPB& protobuf);
- virtual ~TimestampMinMaxFilter() {}
-
- virtual const void* GetMin() const override { return &min_; }
- virtual const void* GetMax() const override { return &max_; }
- virtual PrimitiveType type() override;
-
- virtual void Insert(const void* val) override;
- virtual bool AlwaysTrue() const override { return false; }
- virtual bool AlwaysFalse() const override { return always_false_; }
- virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
- virtual void SetAlwaysTrue() override {
- DCHECK(false) << "Timestamp filters cannot be always true.";
- }
- virtual std::string DebugString() const override;
-
- static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
- static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
-
- /// Struct name in LLVM IR.
- static const char* LLVM_CLASS_NAME;
-
- private:
- TimestampValue min_;
- TimestampValue max_;
+#define DATE_TIME_MIN_MAX_FILTER(NAME, TYPE) \
+ class NAME##MinMaxFilter : public MinMaxFilter { \
+ public: \
+ NAME##MinMaxFilter() { always_false_ = true; } \
+ NAME##MinMaxFilter(const MinMaxFilterPB& protobuf); \
+ virtual ~NAME##MinMaxFilter() {} \
+ virtual const void* GetMin() const override { return &min_; } \
+ virtual const void* GetMax() const override { return &max_; } \
+ virtual PrimitiveType type() override; \
+ virtual void Insert(const void* val) override; \
+ virtual bool AlwaysTrue() const override { return false; } \
+ virtual bool AlwaysFalse() const override { return always_false_; } \
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override; \
+ virtual void SetAlwaysTrue() override { \
+ DCHECK(false) << #NAME << " filters cannot be always true."; \
+ } \
+ virtual std::string DebugString() const override; \
+ static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out); \
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out); \
+ static const char* LLVM_CLASS_NAME; \
+ \
+ private: \
+ TYPE min_; \
+ TYPE max_; \
+ /* True if no rows have been inserted. */ \
+ bool always_false_; \
+ };
- /// True if no rows have been inserted.
- bool always_false_;
-};
+DATE_TIME_MIN_MAX_FILTER(Timestamp, TimestampValue);
+DATE_TIME_MIN_MAX_FILTER(Date, DateValue);
#define DECIMAL_SIZE_4BYTE 4
#define DECIMAL_SIZE_8BYTE 8
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 68fed81..99b862c 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -72,4 +72,5 @@ message ColumnValuePB {
optional string binary_val = 8;
optional string timestamp_val = 9;
optional bytes decimal_val = 10;
+ optional int32 date_val = 11;
}
diff --git a/common/thrift/Data.thrift b/common/thrift/Data.thrift
index 861fd16..f4a5450 100644
--- a/common/thrift/Data.thrift
+++ b/common/thrift/Data.thrift
@@ -30,6 +30,7 @@ struct TColumnValue {
8: optional binary binary_val
9: optional binary timestamp_val
10: optional binary decimal_val
+ 11: optional i32 date_val
}
struct TResultRow {
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 87aa3be..29fef5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -792,8 +792,6 @@ public final class RuntimeFilterGenerator {
&& enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
continue;
}
- // TODO: IMPALA-9294: Support Kudu Date Min/Max Filters
- if (targetExpr.getType().isDate()) continue;
// TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
if (targetExpr.getType().isVarchar()) continue;
SlotRef slotRef = targetExpr.unwrapSlotRef(true);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
index 735dc13..6ec2e16 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
@@ -164,3 +164,54 @@ PLAN-ROOT SINK
tuple-ids=0 row-size=4B cardinality=7.30K
in pipelines: 00(GETNEXT)
====
+# Query with 3-way shuffer joins on Kudu tables with different types of columns.
+select STRAIGHT_JOIN count(*) from functional_kudu.date_tbl a,
+ functional_kudu.date_tbl b,
+ functional_kudu.date_tbl c
+where a.date_col = b.date_col and b.id_col = c.id_col
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=14.62MB mem-reservation=3.88MB thread-reservation=4
+PLAN-ROOT SINK
+| output exprs: count(*)
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+05:AGGREGATE [FINALIZE]
+| output: count(*)
+| mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+| tuple-ids=3 row-size=8B cardinality=1
+| in pipelines: 05(GETNEXT), 00(OPEN)
+|
+04:HASH JOIN [INNER JOIN]
+| hash predicates: b.id_col = c.id_col
+| fk/pk conjuncts: b.id_col = c.id_col
+| runtime filters: RF001[min_max] <- c.id_col
+| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=0,1,2 row-size=16B cardinality=30
+| in pipelines: 00(GETNEXT), 02(OPEN)
+|
+|--02:SCAN KUDU [functional_kudu.date_tbl c]
+| mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+| tuple-ids=2 row-size=4B cardinality=22
+| in pipelines: 02(GETNEXT)
+|
+03:HASH JOIN [INNER JOIN]
+| hash predicates: a.date_col = b.date_col
+| fk/pk conjuncts: none
+| runtime filters: RF003[min_max] <- b.date_col
+| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+| tuple-ids=0,1 row-size=12B cardinality=30
+| in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN KUDU [functional_kudu.date_tbl b]
+| runtime filters: RF001[min_max] -> b.id_col
+| mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
+| tuple-ids=1 row-size=8B cardinality=22
+| in pipelines: 01(GETNEXT)
+|
+00:SCAN KUDU [functional_kudu.date_tbl a]
+ runtime filters: RF003[min_max] -> a.date_col
+ mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=4B cardinality=22
+ in pipelines: 00(GETNEXT)
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test
index 6b8a787..da073ac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/all_runtime_filters.test
@@ -369,8 +369,6 @@ aggregation(SUM, ProbeRows): 732
row_regex: .*2 of 2 Runtime Filters Published.*
====
---- QUERY
-# TODO update the runtime profile for this test case when IMPALA-9294
-# (Support Kudu Date Min/Max Filters) is fixed.
SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
select STRAIGHT_JOIN count(*) from date_tbl a
join [BROADCAST] date_tbl b
@@ -379,7 +377,7 @@ where a.date_col = b.date_col and b.date_col = DATE '2017-11-28'
9
---- RUNTIME_PROFILE
aggregation(SUM, ProbeRows): 3
-row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*2 of 2 Runtime Filters Published.*
====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
index 1f48194..fcbf062 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/min_max_filters.test
@@ -315,6 +315,18 @@ where a.d38_38 = b.d38_38 and b.d38_38 = 0
---- RUNTIME_PROFILE
aggregation(SUM, ProbeRows): 732
====
+---- QUERY
+# IMPALA-9294 Support Kudu Date Min/Max Filters
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select STRAIGHT_JOIN count(*) from date_tbl a join [BROADCAST] date_tbl b
+where a.date_col = b.date_col and b.date_col between DATE '2010-01-01' and
+ DATE '2021-01-01';
+---- RESULTS
+10
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 4
+row_regex: .*1 of 1 Runtime Filter Published.*
+====
---- QUERY
####################################################
@@ -394,4 +406,22 @@ BIGINT
1
1
1
+====
+
+
+---- QUERY
+####################################################
+# Test case 5: three-way parallel shuffle joins.
+###################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select count(*) from
+ functional_kudu.date_tbl a INNER JOIN /* +shuffle */
+ functional_kudu.date_tbl b INNER JOIN /* +shuffle */
+ functional_kudu.date_tbl c
+on a.date_col = b.date_col and b.date_col = c.date_col;
+---- RESULTS
+50
+---- RUNTIME_PROFILE
+aggregation(SUM, ProbeRows): 48
+row_regex: .*1 of 1 Runtime Filter Published.*
====
\ No newline at end of file