You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:49 UTC
[06/16] incubator-impala git commit: IMPALA-4252: Min-max runtime
filters for Kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter-ir.cc b/be/src/util/min-max-filter-ir.cc
new file mode 100644
index 0000000..130d11d
--- /dev/null
+++ b/be/src/util/min-max-filter-ir.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/min-max-filter.h"
+
+#include "runtime/string-value.inline.h"
+
+using std::string;
+
+namespace impala {
+
+#define NUMERIC_MIN_MAX_FILTER_INSERT(NAME, TYPE) \
+ void NAME##MinMaxFilter::Insert(void* val) { \
+ if (val == nullptr) return; \
+ TYPE* value = reinterpret_cast<TYPE*>(val); \
+ if (*value < min_) min_ = *value; \
+ if (*value > max_) max_ = *value; \
+ }
+
+NUMERIC_MIN_MAX_FILTER_INSERT(Bool, bool);
+NUMERIC_MIN_MAX_FILTER_INSERT(TinyInt, int8_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(SmallInt, int16_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(Int, int32_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(BigInt, int64_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(Float, float);
+NUMERIC_MIN_MAX_FILTER_INSERT(Double, double);
+
+void StringMinMaxFilter::Insert(void* val) {
+ if (val == nullptr || always_true_) return;
+ const StringValue* value = reinterpret_cast<const StringValue*>(val);
+ if (always_false_) {
+ min_ = *value;
+ max_ = *value;
+ always_false_ = false;
+ } else {
+ if (*value < min_) {
+ min_ = *value;
+ min_buffer_.Clear();
+ } else if (*value > max_) {
+ max_ = *value;
+ max_buffer_.Clear();
+ }
+ }
+}
+
+void TimestampMinMaxFilter::Insert(void* val) {
+ if (val == nullptr) return;
+ const TimestampValue* value = reinterpret_cast<const TimestampValue*>(val);
+ if (always_false_) {
+ min_ = *value;
+ max_ = *value;
+ always_false_ = false;
+ } else {
+ if (*value < min_) {
+ min_ = *value;
+ } else if (*value > max_) {
+ max_ = *value;
+ }
+ }
+}
+
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
new file mode 100644
index 0000000..23712e6
--- /dev/null
+++ b/be/src/util/min-max-filter-test.cc
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+#include "util/min-max-filter.h"
+
+#include "runtime/string-value.inline.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+#include "util/test-info.h"
+
+DECLARE_bool(enable_webserver);
+
+using namespace impala;
+
+// Tests that a BoolMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for bools.
+TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
+ MemTracker mem_tracker;
+ MemPool mem_pool(&mem_tracker);
+ ObjectPool obj_pool;
+
+ MinMaxFilter* filter =
+ MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &obj_pool, &mem_pool);
+ EXPECT_TRUE(filter->AlwaysFalse());
+ bool b1 = true;
+ filter->Insert(&b1);
+ EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMin()), b1);
+ EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
+ EXPECT_FALSE(filter->AlwaysFalse());
+
+ bool b2 = false;
+ filter->Insert(&b2);
+ EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMin()), b2);
+ EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
+
+ // Check the behavior of Or.
+ TMinMaxFilter tFilter1;
+ tFilter1.min.__set_bool_val(false);
+ tFilter1.max.__set_bool_val(true);
+ TMinMaxFilter tFilter2;
+ tFilter2.min.__set_bool_val(false);
+ tFilter2.max.__set_bool_val(false);
+ MinMaxFilter::Or(tFilter1, &tFilter2);
+ EXPECT_FALSE(tFilter2.min.bool_val);
+ EXPECT_TRUE(tFilter2.max.bool_val);
+}
+
+void CheckIntVals(MinMaxFilter* filter, int32_t min, int32_t max) {
+ EXPECT_EQ(*reinterpret_cast<int32_t*>(filter->GetMin()), min);
+ EXPECT_EQ(*reinterpret_cast<int32_t*>(filter->GetMax()), max);
+ EXPECT_FALSE(filter->AlwaysFalse());
+ EXPECT_FALSE(filter->AlwaysTrue());
+}
+
+// Tests that a IntMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for ints.
+// This also provides coverage for the other numeric MinMaxFilter types as they're
+// generated with maxcros and the logic is identical.
+TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
+ MemTracker mem_tracker;
+ MemPool mem_pool(&mem_tracker);
+ ObjectPool obj_pool;
+
+ ColumnType int_type(PrimitiveType::TYPE_INT);
+ MinMaxFilter* int_filter = MinMaxFilter::Create(int_type, &obj_pool, &mem_pool);
+
+ // Test the behavior of an empty filter.
+ EXPECT_TRUE(int_filter->AlwaysFalse());
+ EXPECT_FALSE(int_filter->AlwaysTrue());
+ TMinMaxFilter tFilter;
+ int_filter->ToThrift(&tFilter);
+ EXPECT_TRUE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_FALSE(tFilter.min.__isset.int_val);
+ EXPECT_FALSE(tFilter.max.__isset.int_val);
+ MinMaxFilter* empty_filter =
+ MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+ EXPECT_TRUE(empty_filter->AlwaysFalse());
+ EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+ // Now insert some stuff.
+ int32_t i1 = 10;
+ int_filter->Insert(&i1);
+ CheckIntVals(int_filter, i1, i1);
+ int32_t i2 = 15;
+ int_filter->Insert(&i2);
+ CheckIntVals(int_filter, i1, i2);
+ int32_t i3 = 12;
+ int_filter->Insert(&i3);
+ CheckIntVals(int_filter, i1, i2);
+ int32_t i4 = 8;
+ int_filter->Insert(&i4);
+ CheckIntVals(int_filter, i4, i2);
+
+ int_filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_EQ(tFilter.min.int_val, i4);
+ EXPECT_EQ(tFilter.max.int_val, i2);
+ MinMaxFilter* int_filter2 =
+ MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+ CheckIntVals(int_filter2, i4, i2);
+
+ // Check the behavior of Or.
+ TMinMaxFilter tFilter1;
+ tFilter1.min.__set_int_val(4);
+ tFilter1.max.__set_int_val(8);
+ TMinMaxFilter tFilter2;
+ tFilter2.min.__set_int_val(2);
+ tFilter2.max.__set_int_val(7);
+ MinMaxFilter::Or(tFilter1, &tFilter2);
+ EXPECT_EQ(tFilter2.min.int_val, 2);
+ EXPECT_EQ(tFilter2.max.int_val, 8);
+}
+
+void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max) {
+ StringValue actual_min = *reinterpret_cast<StringValue*>(filter->GetMin());
+ StringValue actual_max = *reinterpret_cast<StringValue*>(filter->GetMax());
+ StringValue expected_min(min);
+ StringValue expected_max(max);
+ EXPECT_EQ(actual_min, expected_min);
+ EXPECT_EQ(actual_max, expected_max);
+ EXPECT_FALSE(filter->AlwaysTrue());
+ EXPECT_FALSE(filter->AlwaysFalse());
+}
+
+// Tests that a StringMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for strings.
+// Also tests truncation behavior when inserted strings are larger than MAX_BOUND_LENTH
+// and that the filter is disabled if there's not enough mem to store the min/max.
+TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
+ ObjectPool obj_pool;
+ MemTracker mem_tracker;
+ MemPool mem_pool(&mem_tracker);
+
+ ColumnType string_type(PrimitiveType::TYPE_STRING);
+ MinMaxFilter* filter = MinMaxFilter::Create(string_type, &obj_pool, &mem_pool);
+
+ // Test the behavior of an empty filter.
+ EXPECT_TRUE(filter->AlwaysFalse());
+ EXPECT_FALSE(filter->AlwaysTrue());
+ filter->MaterializeValues();
+ EXPECT_TRUE(filter->AlwaysFalse());
+ EXPECT_FALSE(filter->AlwaysTrue());
+ TMinMaxFilter tFilter;
+ filter->ToThrift(&tFilter);
+ EXPECT_TRUE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+
+ MinMaxFilter* empty_filter =
+ MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+ EXPECT_TRUE(empty_filter->AlwaysFalse());
+ EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+ // Now insert some stuff.
+ string c = "c";
+ StringValue cVal(c);
+ filter->Insert(&cVal);
+ filter->MaterializeValues();
+ CheckStringVals(filter, c, c);
+
+ string d = "d";
+ StringValue dVal(d);
+ filter->Insert(&dVal);
+ filter->MaterializeValues();
+ CheckStringVals(filter, c, d);
+
+ string cc = "cc";
+ StringValue ccVal(cc);
+ filter->Insert(&ccVal);
+ filter->MaterializeValues();
+ CheckStringVals(filter, c, d);
+
+ filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_EQ(tFilter.min.string_val, c);
+ EXPECT_EQ(tFilter.max.string_val, d);
+
+ // Test that strings longer than 1024 are truncated.
+ string b1030(1030, 'b');
+ StringValue b1030Val(b1030);
+ filter->Insert(&b1030Val);
+ filter->MaterializeValues();
+ string b1024(1024, 'b');
+ CheckStringVals(filter, b1024, d);
+
+ string e1030(1030, 'e');
+ StringValue e1030Val(e1030);
+ filter->Insert(&e1030Val);
+ filter->MaterializeValues();
+ string e1024(1024, 'e');
+ // For max, after truncating the final char is increased by one.
+ e1024[1023] = 'f';
+ CheckStringVals(filter, b1024, e1024);
+
+ string trailMaxChar(1030, 'f');
+ int trailIndex = 1020;
+ for (int i = trailIndex; i < 1030; ++i) trailMaxChar[i] = -1;
+ StringValue trailMaxCharVal(trailMaxChar);
+ filter->Insert(&trailMaxCharVal);
+ filter->MaterializeValues();
+ // Check that when adding one for max, if the final char is the max char it overflows
+ // and carries.
+ string truncTrailMaxChar(1024, 'f');
+ truncTrailMaxChar[trailIndex - 1] = 'g';
+ for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
+ CheckStringVals(filter, b1024, truncTrailMaxChar);
+
+ filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_EQ(tFilter.min.string_val, b1024);
+ EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
+
+ MinMaxFilter* filter2 =
+ MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+ CheckStringVals(filter2, b1024, truncTrailMaxChar);
+
+ // Check that if the entire string is the max char and therefore after truncating for
+ // max we can't add one, the filter is disabled.
+ string allMaxChar(1030, -1);
+ StringValue allMaxCharVal(allMaxChar);
+ filter->Insert(&allMaxCharVal);
+ filter->MaterializeValues();
+ EXPECT_TRUE(filter->AlwaysTrue());
+
+ // We should still be able to insert into a disabled filter.
+ filter->Insert(&cVal);
+ EXPECT_TRUE(filter->AlwaysTrue());
+
+ filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_TRUE(tFilter.always_true);
+
+ MinMaxFilter* always_true_filter =
+ MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+ EXPECT_FALSE(always_true_filter->AlwaysFalse());
+ EXPECT_TRUE(always_true_filter->AlwaysTrue());
+
+ mem_pool.FreeAll();
+
+ // Check that a filter that hits the mem limit is disabled.
+ MemTracker limit_mem_tracker(1);
+ MemPool limit_mem_pool(&limit_mem_tracker);
+ // We do not want to start the webserver.
+ FLAGS_enable_webserver = false;
+ std::unique_ptr<TestEnv> env;
+ env.reset(new TestEnv());
+ ASSERT_OK(env->Init());
+
+ MinMaxFilter* limit_filter =
+ MinMaxFilter::Create(string_type, &obj_pool, &limit_mem_pool);
+ EXPECT_FALSE(limit_filter->AlwaysTrue());
+ limit_filter->Insert(&cVal);
+ limit_filter->MaterializeValues();
+ EXPECT_TRUE(limit_filter->AlwaysTrue());
+ limit_filter->Insert(&dVal);
+ limit_filter->MaterializeValues();
+ EXPECT_TRUE(limit_filter->AlwaysTrue());
+
+ limit_filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_TRUE(tFilter.always_true);
+
+ // Check the behavior of Or.
+ TMinMaxFilter tFilter1;
+ tFilter1.min.__set_string_val("a");
+ tFilter1.max.__set_string_val("d");
+ TMinMaxFilter tFilter2;
+ tFilter2.min.__set_string_val("b");
+ tFilter2.max.__set_string_val("e");
+ MinMaxFilter::Or(tFilter1, &tFilter2);
+ EXPECT_EQ(tFilter2.min.string_val, "a");
+ EXPECT_EQ(tFilter2.max.string_val, "e");
+}
+
+void CheckTimestampVals(
+ MinMaxFilter* filter, const TimestampValue& min, const TimestampValue& max) {
+ EXPECT_EQ(*reinterpret_cast<TimestampValue*>(filter->GetMin()), min);
+ EXPECT_EQ(*reinterpret_cast<TimestampValue*>(filter->GetMax()), max);
+ EXPECT_FALSE(filter->AlwaysFalse());
+ EXPECT_FALSE(filter->AlwaysTrue());
+}
+
+// Tests that a TimestampMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for timestamps.
+TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
+ ObjectPool obj_pool;
+ MemTracker mem_tracker;
+ MemPool mem_pool(&mem_tracker);
+ ColumnType timestamp_type(PrimitiveType::TYPE_TIMESTAMP);
+ MinMaxFilter* filter = MinMaxFilter::Create(timestamp_type, &obj_pool, &mem_pool);
+
+ // Test the behavior of an empty filter.
+ EXPECT_TRUE(filter->AlwaysFalse());
+ EXPECT_FALSE(filter->AlwaysTrue());
+ TMinMaxFilter tFilter;
+ filter->ToThrift(&tFilter);
+ EXPECT_TRUE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
+ EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
+ MinMaxFilter* empty_filter =
+ MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+ EXPECT_TRUE(empty_filter->AlwaysFalse());
+ EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+ // Now insert some stuff.
+ TimestampValue t1 = TimestampValue::Parse("2000-01-01 00:00:00");
+ filter->Insert(&t1);
+ CheckTimestampVals(filter, t1, t1);
+ TimestampValue t2 = TimestampValue::Parse("1990-01-01 12:30:00");
+ filter->Insert(&t2);
+ CheckTimestampVals(filter, t2, t1);
+ TimestampValue t3 = TimestampValue::Parse("2001-04-30 05:00:00");
+ filter->Insert(&t3);
+ CheckTimestampVals(filter, t2, t3);
+ TimestampValue t4 = TimestampValue::Parse("2001-04-30 01:00:00");
+ filter->Insert(&t4);
+ CheckTimestampVals(filter, t2, t3);
+
+ filter->ToThrift(&tFilter);
+ EXPECT_FALSE(tFilter.always_false);
+ EXPECT_FALSE(tFilter.always_true);
+ EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
+ EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
+ MinMaxFilter* filter2 =
+ MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+ CheckTimestampVals(filter2, t2, t3);
+
+ // Check the behavior of Or.
+ TMinMaxFilter tFilter1;
+ t2.ToTColumnValue(&tFilter1.min);
+ t4.ToTColumnValue(&tFilter1.max);
+ TMinMaxFilter tFilter2;
+ t1.ToTColumnValue(&tFilter2.min);
+ t3.ToTColumnValue(&tFilter2.max);
+ MinMaxFilter::Or(tFilter1, &tFilter2);
+ EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
+ EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
+ InitFeSupport();
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
new file mode 100644
index 0000000..f50f896
--- /dev/null
+++ b/be/src/util/min-max-filter.cc
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/min-max-filter.h"
+
+#include <sstream>
+#include <unordered_map>
+
+#include "common/object-pool.h"
+#include "runtime/raw-value.h"
+#include "runtime/string-value.inline.h"
+#include "runtime/timestamp-value.inline.h"
+
+using std::numeric_limits;
+using std::stringstream;
+
+namespace impala {
+
+static std::unordered_map<int, string> MIN_MAX_FILTER_LLVM_CLASS_NAMES = {
+ {PrimitiveType::TYPE_BOOLEAN, BoolMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_TINYINT, TinyIntMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_SMALLINT, SmallIntMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_INT, IntMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_BIGINT, BigIntMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_FLOAT, FloatMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_DOUBLE, DoubleMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_STRING, StringMinMaxFilter::LLVM_CLASS_NAME},
+ {PrimitiveType::TYPE_TIMESTAMP, TimestampMinMaxFilter::LLVM_CLASS_NAME}};
+
+static std::unordered_map<int, IRFunction::Type> MIN_MAX_FILTER_IR_FUNCTION_TYPES = {
+ {PrimitiveType::TYPE_BOOLEAN, IRFunction::BOOL_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_TINYINT, IRFunction::TINYINT_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_SMALLINT, IRFunction::SMALLINT_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_INT, IRFunction::INT_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_BIGINT, IRFunction::BIGINT_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_FLOAT, IRFunction::FLOAT_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_DOUBLE, IRFunction::DOUBLE_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_STRING, IRFunction::STRING_MIN_MAX_FILTER_INSERT},
+ {PrimitiveType::TYPE_TIMESTAMP, IRFunction::TIMESTAMP_MIN_MAX_FILTER_INSERT}};
+
+string MinMaxFilter::GetLlvmClassName(PrimitiveType type) {
+ return MIN_MAX_FILTER_LLVM_CLASS_NAMES[type];
+}
+
+IRFunction::Type MinMaxFilter::GetInsertIRFunctionType(PrimitiveType type) {
+ return MIN_MAX_FILTER_IR_FUNCTION_TYPES[type];
+}
+
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE) \
+ const char* NAME##MinMaxFilter::LLVM_CLASS_NAME = \
+ "class.impala::" #NAME "MinMaxFilter"; \
+ NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) { \
+ DCHECK(!thrift.always_true); \
+ if (thrift.always_false) { \
+ min_ = numeric_limits<TYPE>::max(); \
+ max_ = numeric_limits<TYPE>::lowest(); \
+ } else { \
+ DCHECK(thrift.__isset.min); \
+ DCHECK(thrift.__isset.max); \
+ DCHECK(thrift.min.__isset.THRIFT_TYPE##_val); \
+ DCHECK(thrift.max.__isset.THRIFT_TYPE##_val); \
+ min_ = thrift.min.THRIFT_TYPE##_val; \
+ max_ = thrift.max.THRIFT_TYPE##_val; \
+ } \
+ } \
+ PrimitiveType NAME##MinMaxFilter::type() { \
+ return PrimitiveType::TYPE_##PRIMITIVE_TYPE; \
+ } \
+ void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const { \
+ if (!AlwaysFalse()) { \
+ thrift->min.__set_##THRIFT_TYPE##_val(min_); \
+ thrift->__isset.min = true; \
+ thrift->max.__set_##THRIFT_TYPE##_val(max_); \
+ thrift->__isset.max = true; \
+ } \
+ thrift->__set_always_false(AlwaysFalse()); \
+ thrift->__set_always_true(false); \
+ } \
+ string NAME##MinMaxFilter::DebugString() const { \
+ stringstream out; \
+ out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_ \
+ << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")"; \
+ return out.str(); \
+ } \
+ void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) { \
+ if (out->always_false) { \
+ out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val); \
+ out->__isset.min = true; \
+ out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val); \
+ out->__isset.max = true; \
+ out->__set_always_false(false); \
+ } else { \
+ out->min.__set_##THRIFT_TYPE##_val( \
+ std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val)); \
+ out->max.__set_##THRIFT_TYPE##_val( \
+ std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val)); \
+ } \
+ } \
+ void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
+ out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val); \
+ out->__isset.min = true; \
+ out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val); \
+ out->__isset.max = true; \
+ }
+
+NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
+NUMERIC_MIN_MAX_FILTER_FUNCS(TinyInt, int8_t, byte, TINYINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(SmallInt, int16_t, short, SMALLINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Int, int32_t, int, INT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(BigInt, int64_t, long, BIGINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Float, float, double, FLOAT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Double, double, double, DOUBLE);
+
+int64_t GetIntTypeMax(const ColumnType& type) {
+ switch (type.type) {
+ case TYPE_TINYINT:
+ return numeric_limits<int8_t>::max();
+ case TYPE_SMALLINT:
+ return numeric_limits<int16_t>::max();
+ case TYPE_INT:
+ return numeric_limits<int32_t>::max();
+ case TYPE_BIGINT:
+ return numeric_limits<int64_t>::max();
+ default:
+ DCHECK(false) << "Not an int type: " << type;
+ }
+ return -1;
+}
+
+int64_t GetIntTypeMin(const ColumnType& type) {
+ switch (type.type) {
+ case TYPE_TINYINT:
+ return numeric_limits<int8_t>::lowest();
+ case TYPE_SMALLINT:
+ return numeric_limits<int16_t>::lowest();
+ case TYPE_INT:
+ return numeric_limits<int32_t>::lowest();
+ case TYPE_BIGINT:
+ return numeric_limits<int64_t>::lowest();
+ default:
+ DCHECK(false) << "Not an int type: " << type;
+ }
+ return -1;
+}
+
+#define NUMERIC_MIN_MAX_FILTER_CAST(NAME) \
+ bool NAME##MinMaxFilter::GetCastIntMinMax( \
+ const ColumnType& type, int64_t* out_min, int64_t* out_max) { \
+ int64_t type_min = GetIntTypeMin(type); \
+ int64_t type_max = GetIntTypeMax(type); \
+ if (min_ < type_min) { \
+ *out_min = type_min; \
+ } else if (min_ > type_max) { \
+ return false; \
+ } else { \
+ *out_min = min_; \
+ } \
+ if (max_ > type_max) { \
+ *out_max = type_max; \
+ } else if (max_ < type_min) { \
+ return false; \
+ } else { \
+ *out_max = max_; \
+ } \
+ return true; \
+ }
+
+NUMERIC_MIN_MAX_FILTER_CAST(TinyInt);
+NUMERIC_MIN_MAX_FILTER_CAST(SmallInt);
+NUMERIC_MIN_MAX_FILTER_CAST(Int);
+NUMERIC_MIN_MAX_FILTER_CAST(BigInt);
+
+#define NUMERIC_MIN_MAX_FILTER_NO_CAST(NAME) \
+ bool NAME##MinMaxFilter::GetCastIntMinMax( \
+ const ColumnType& type, int64_t* out_min, int64_t* out_max) { \
+ DCHECK(false) << "Casting min-max filters of type " << #NAME << " not supported."; \
+ return true; \
+ }
+
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Bool);
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Float);
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Double);
+
+// STRING
+const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFilter";
+const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
+
+StringMinMaxFilter::StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool)
+ : min_buffer_(mem_pool), max_buffer_(mem_pool) {
+ always_false_ = thrift.always_false;
+ always_true_ = thrift.always_true;
+ if (!always_true_ && !always_false_) {
+ DCHECK(thrift.__isset.min);
+ DCHECK(thrift.__isset.max);
+ DCHECK(thrift.min.__isset.string_val);
+ DCHECK(thrift.max.__isset.string_val);
+ min_ = StringValue(thrift.min.string_val);
+ max_ = StringValue(thrift.max.string_val);
+ CopyToBuffer(&min_buffer_, &min_, min_.len);
+ CopyToBuffer(&max_buffer_, &max_, max_.len);
+ }
+}
+
+PrimitiveType StringMinMaxFilter::type() {
+ return PrimitiveType::TYPE_STRING;
+}
+
+void StringMinMaxFilter::MaterializeValues() {
+ if (always_true_ || always_false_) return;
+ if (min_buffer_.IsEmpty()) {
+ if (min_.len > MAX_BOUND_LENGTH) {
+ // Truncating 'value' gives a valid min bound as the result will be <= 'value'.
+ CopyToBuffer(&min_buffer_, &min_, MAX_BOUND_LENGTH);
+ } else {
+ CopyToBuffer(&min_buffer_, &min_, min_.len);
+ }
+ }
+ if (max_buffer_.IsEmpty()) {
+ if (max_.len > MAX_BOUND_LENGTH) {
+ CopyToBuffer(&max_buffer_, &max_, MAX_BOUND_LENGTH);
+ if (always_true_) return;
+ // After truncating 'value', to still have a valid max bound we add 1 to one char in
+ // the string, so that the result will be > 'value'. If the entire string is already
+ // the max char, then disable this filter by making it always_true.
+ int i = MAX_BOUND_LENGTH - 1;
+ while (i >= 0 && static_cast<int32_t>(max_buffer_.buffer()[i]) == -1) {
+ max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
+ --i;
+ }
+ if (i == -1) {
+ SetAlwaysTrue();
+ return;
+ }
+ max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
+ } else {
+ CopyToBuffer(&max_buffer_, &max_, max_.len);
+ }
+ }
+}
+
+void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+ if (!always_true_ && !always_false_) {
+ thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
+ thrift->min.__isset.string_val = true;
+ thrift->__isset.min = true;
+ thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
+ thrift->max.__isset.string_val = true;
+ thrift->__isset.max = true;
+ }
+ thrift->__set_always_false(always_false_);
+ thrift->__set_always_true(always_true_);
+}
+
+string StringMinMaxFilter::DebugString() const {
+ stringstream out;
+ out << "StringMinMaxFilter(min=" << min_ << ", max=" << max_
+ << ", always_false=" << (always_false_ ? "true" : "false")
+ << ", always_true=" << (always_true_ ? "true" : "false") << ")";
+ return out.str();
+}
+
+void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ if (out->always_false) {
+ out->min.__set_string_val(in.min.string_val);
+ out->__isset.min = true;
+ out->max.__set_string_val(in.max.string_val);
+ out->__isset.max = true;
+ out->__set_always_false(false);
+ } else {
+ StringValue in_min_val = StringValue(in.min.string_val);
+ StringValue out_min_val = StringValue(out->min.string_val);
+ if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
+ StringValue in_max_val = StringValue(in.max.string_val);
+ StringValue out_max_val = StringValue(out->max.string_val);
+ if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
+ }
+}
+
+void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ out->min.__set_string_val(in.min.string_val);
+ out->__isset.min = true;
+ out->max.__set_string_val(in.max.string_val);
+ out->__isset.max = true;
+}
+
+void StringMinMaxFilter::CopyToBuffer(
+ StringBuffer* buffer, StringValue* value, int64_t len) {
+ if (value->ptr == buffer->buffer()) return;
+ buffer->Clear();
+ if (!buffer->Append(value->ptr, len).ok()) {
+ // If Append() fails, for example because we're out of memory, disable the filter.
+ SetAlwaysTrue();
+ return;
+ }
+ value->ptr = buffer->buffer();
+ value->len = len;
+}
+
+void StringMinMaxFilter::SetAlwaysTrue() {
+ always_true_ = true;
+ max_buffer_.Clear();
+ min_buffer_.Clear();
+ min_.ptr = nullptr;
+ min_.len = 0;
+ max_.ptr = nullptr;
+ max_.len = 0;
+}
+
+// TIMESTAMP
+const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
+ "class.impala::TimestampMinMaxFilter";
+
+TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
+ always_false_ = thrift.always_false;
+ if (!always_false_) {
+ DCHECK(thrift.min.__isset.timestamp_val);
+ DCHECK(thrift.max.__isset.timestamp_val);
+ min_ = TimestampValue::FromTColumnValue(thrift.min);
+ max_ = TimestampValue::FromTColumnValue(thrift.max);
+ }
+}
+
+PrimitiveType TimestampMinMaxFilter::type() {
+ return PrimitiveType::TYPE_TIMESTAMP;
+}
+
+void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+ if (!always_false_) {
+ min_.ToTColumnValue(&thrift->min);
+ thrift->__isset.min = true;
+ max_.ToTColumnValue(&thrift->max);
+ thrift->__isset.max = true;
+ }
+ thrift->__set_always_false(always_false_);
+ thrift->__set_always_true(false);
+}
+
+string TimestampMinMaxFilter::DebugString() const {
+ stringstream out;
+ out << "TimestampMinMaxFilter(min=" << min_ << ", max=" << max_
+ << " always_false=" << (always_false_ ? "true" : "false") << ")";
+ return out.str();
+}
+
+void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ if (out->always_false) {
+ out->min.__set_timestamp_val(in.min.timestamp_val);
+ out->__isset.min = true;
+ out->max.__set_timestamp_val(in.max.timestamp_val);
+ out->__isset.max = true;
+ out->__set_always_false(false);
+ } else {
+ TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
+ TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
+ if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
+ TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
+ TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
+ if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
+ }
+}
+
+void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ out->min.__set_timestamp_val(in.min.timestamp_val);
+ out->__isset.min = true;
+ out->max.__set_timestamp_val(in.max.timestamp_val);
+ out->__isset.max = true;
+}
+
+// MinMaxFilter
+bool MinMaxFilter::GetCastIntMinMax(
+ const ColumnType& type, int64_t* out_min, int64_t* out_max) {
+ DCHECK(false) << "Casting min-max filters of type " << this->type()
+ << " not supported.";
+ return true;
+}
+
+MinMaxFilter* MinMaxFilter::Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+ switch (type.type) {
+ case PrimitiveType::TYPE_BOOLEAN:
+ return pool->Add(new BoolMinMaxFilter());
+ case PrimitiveType::TYPE_TINYINT:
+ return pool->Add(new TinyIntMinMaxFilter());
+ case PrimitiveType::TYPE_SMALLINT:
+ return pool->Add(new SmallIntMinMaxFilter());
+ case PrimitiveType::TYPE_INT:
+ return pool->Add(new IntMinMaxFilter());
+ case PrimitiveType::TYPE_BIGINT:
+ return pool->Add(new BigIntMinMaxFilter());
+ case PrimitiveType::TYPE_FLOAT:
+ return pool->Add(new FloatMinMaxFilter());
+ case PrimitiveType::TYPE_DOUBLE:
+ return pool->Add(new DoubleMinMaxFilter());
+ case PrimitiveType::TYPE_STRING:
+ return pool->Add(new StringMinMaxFilter(mem_pool));
+ case PrimitiveType::TYPE_TIMESTAMP:
+ return pool->Add(new TimestampMinMaxFilter());
+ default:
+ DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
+ }
+ return nullptr;
+}
+
+MinMaxFilter* MinMaxFilter::Create(
+ const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+ switch (type.type) {
+ case PrimitiveType::TYPE_BOOLEAN:
+ return pool->Add(new BoolMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_TINYINT:
+ return pool->Add(new TinyIntMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_SMALLINT:
+ return pool->Add(new SmallIntMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_INT:
+ return pool->Add(new IntMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_BIGINT:
+ return pool->Add(new BigIntMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_FLOAT:
+ return pool->Add(new FloatMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_DOUBLE:
+ return pool->Add(new DoubleMinMaxFilter(thrift));
+ case PrimitiveType::TYPE_STRING:
+ return pool->Add(new StringMinMaxFilter(thrift, mem_pool));
+ case PrimitiveType::TYPE_TIMESTAMP:
+ return pool->Add(new TimestampMinMaxFilter(thrift));
+ default:
+ DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
+ }
+ return nullptr;
+}
+
+void MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ if (in.always_false || out->always_true) return;
+ if (in.always_true) {
+ out->__set_always_true(true);
+ return;
+ }
+ if (in.min.__isset.bool_val) {
+ DCHECK(out->min.__isset.bool_val);
+ BoolMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.byte_val) {
+ DCHECK(out->min.__isset.byte_val);
+ TinyIntMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.short_val) {
+ DCHECK(out->min.__isset.short_val);
+ SmallIntMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.int_val) {
+ DCHECK(out->min.__isset.int_val);
+ IntMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.long_val) {
+ DCHECK(out->min.__isset.long_val);
+ BigIntMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.double_val) {
+ // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
+ DCHECK(out->min.__isset.double_val);
+ DoubleMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.string_val) {
+ DCHECK(out->min.__isset.string_val);
+ StringMinMaxFilter::Or(in, out);
+ return;
+ } else if (in.min.__isset.timestamp_val) {
+ DCHECK(out->min.__isset.timestamp_val);
+ TimestampMinMaxFilter::Or(in, out);
+ return;
+ }
+ DCHECK(false) << "Unsupported MinMaxFilter type.";
+}
+
+void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+ out->__set_always_false(in.always_false);
+ out->__set_always_true(in.always_true);
+ if (in.always_false || in.always_true) return;
+ if (in.min.__isset.bool_val) {
+ DCHECK(!out->min.__isset.bool_val);
+ BoolMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.byte_val) {
+ DCHECK(!out->min.__isset.byte_val);
+ TinyIntMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.short_val) {
+ DCHECK(!out->min.__isset.short_val);
+ SmallIntMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.int_val) {
+ DCHECK(!out->min.__isset.int_val);
+ IntMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.long_val) {
+ // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
+ DCHECK(!out->min.__isset.long_val);
+ BigIntMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.double_val) {
+ // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
+ DCHECK(!out->min.__isset.double_val);
+ DoubleMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.string_val) {
+ DCHECK(!out->min.__isset.string_val);
+ StringMinMaxFilter::Copy(in, out);
+ return;
+ } else if (in.min.__isset.timestamp_val) {
+ DCHECK(!out->min.__isset.timestamp_val);
+ TimestampMinMaxFilter::Copy(in, out);
+ return;
+ }
+ DCHECK(false) << "Unsupported MinMaxFilter type.";
+}
+
+} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
new file mode 100644
index 0000000..556f5fa
--- /dev/null
+++ b/be/src/util/min-max-filter.h
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_MIN_MAX_FILTER_H
+#define IMPALA_UTIL_MIN_MAX_FILTER_H
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "impala-ir/impala-ir-functions.h"
+#include "runtime/string-buffer.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+
+namespace impala {
+
+class MemPool;
+class ObjectPool;
+
+/// A MinMaxFilter tracks the min and max currently seen values in a data set for use in
+/// runtime filters.
+///
+/// Filters are constructed using MinMaxFilter::Create() which returns a MinMaxFilter of
+/// the appropriate type. Values can then be added using Insert(), and the min and max can
+/// be retrieved using GetMin()/GetMax().
+///
+/// MinMaxFilters ignore NULL values, and so are only appropriate to use as a runtime
+/// filter if the join predicate is '=' and not 'is not distinct from'.
+class MinMaxFilter {
+ public:
+ virtual ~MinMaxFilter() {}
+
+ /// Returns the min/max values in the tuple slot representation. It is not valid to call
+ /// these functions if AlwaysFalse() returns true.
+ virtual void* GetMin() = 0;
+ virtual void* GetMax() = 0;
+
+ /// Returns the min/max values in the out paramsters 'out_min'/'out_max', converted to
+ /// fit into 'type', eg. if the calculated max value is greater than the max value for
+ /// 'type', the returned max is the max for 'type'. Returns false if the entire range
+ /// from the calculated min to max is outside the range for 'type'. May only be called
+ /// for integer-typed filters.
+ virtual bool GetCastIntMinMax(
+ const ColumnType& type, int64_t* out_min, int64_t* out_max);
+
+ virtual PrimitiveType type() = 0;
+
+ /// Add a new value, updating the current min/max.
+ virtual void Insert(void* val) = 0;
+
+ /// If true, this filter allows all rows to pass.
+ virtual bool AlwaysTrue() const = 0;
+
+ /// If true, this filter doesn't allow any rows to pass.
+ virtual bool AlwaysFalse() const = 0;
+
+ /// Materialize filter values by copying any values stored by filters into memory owned
+ /// by the filter. Filters may assume that the memory for Insert()-ed values stays valid
+ /// until this is called.
+ virtual void MaterializeValues() {}
+
+ /// Convert this filter to a thrift representation.
+ virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
+
+ virtual std::string DebugString() const = 0;
+
+ /// Returns a new MinMaxFilter with the given type, allocated from 'pool'.
+ static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+
+ /// Returns a new MinMaxFilter created from the thrift representation, allocated from
+ /// 'pool'.
+ static MinMaxFilter* Create(
+ const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+
+ /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
+ static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+ /// Copies the contents of 'in' into 'out'.
+ static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+ /// Returns the LLVM_CLASS_NAME for the given type.
+ static std::string GetLlvmClassName(PrimitiveType type);
+
+ /// Returns the IRFunction::Type for Insert() for the given type.
+ static IRFunction::Type GetInsertIRFunctionType(PrimitiveType type);
+};
+
+#define NUMERIC_MIN_MAX_FILTER(NAME, TYPE) \
+ class NAME##MinMaxFilter : public MinMaxFilter { \
+ public: \
+ NAME##MinMaxFilter() { \
+ min_ = std::numeric_limits<TYPE>::max(); \
+ max_ = std::numeric_limits<TYPE>::lowest(); \
+ } \
+ NAME##MinMaxFilter(const TMinMaxFilter& thrift); \
+ virtual ~NAME##MinMaxFilter() {} \
+ virtual void* GetMin() override { return &min_; } \
+ virtual void* GetMax() override { return &max_; } \
+ virtual bool GetCastIntMinMax( \
+ const ColumnType& type, int64_t* out_min, int64_t* out_max) override; \
+ virtual PrimitiveType type() override; \
+ virtual void Insert(void* val) override; \
+ virtual bool AlwaysTrue() const override { return false; } \
+ virtual bool AlwaysFalse() const override { \
+ return min_ == std::numeric_limits<TYPE>::max() \
+ && max_ == std::numeric_limits<TYPE>::lowest(); \
+ } \
+ virtual void ToThrift(TMinMaxFilter* thrift) const override; \
+ virtual std::string DebugString() const override; \
+ static void Or(const TMinMaxFilter& in, TMinMaxFilter* out); \
+ static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out); \
+ static const char* LLVM_CLASS_NAME; \
+ \
+ private: \
+ TYPE min_; \
+ TYPE max_; \
+ };
+
+NUMERIC_MIN_MAX_FILTER(Bool, bool);
+NUMERIC_MIN_MAX_FILTER(TinyInt, int8_t);
+NUMERIC_MIN_MAX_FILTER(SmallInt, int16_t);
+NUMERIC_MIN_MAX_FILTER(Int, int32_t);
+NUMERIC_MIN_MAX_FILTER(BigInt, int64_t);
+NUMERIC_MIN_MAX_FILTER(Float, float);
+NUMERIC_MIN_MAX_FILTER(Double, double);
+
+class StringMinMaxFilter : public MinMaxFilter {
+ public:
+ StringMinMaxFilter(MemPool* mem_pool)
+ : min_buffer_(mem_pool),
+ max_buffer_(mem_pool),
+ always_false_(true),
+ always_true_(false) {}
+ StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool);
+ virtual ~StringMinMaxFilter() {}
+
+ virtual void* GetMin() override { return &min_; }
+ virtual void* GetMax() override { return &max_; }
+ virtual PrimitiveType type() override;
+
+ virtual void Insert(void* val) override;
+ virtual bool AlwaysTrue() const override { return always_true_; }
+ virtual bool AlwaysFalse() const override { return always_false_; }
+
+ /// Copies the values pointed to by 'min_'/'max_' into 'min_buffer_'/'max_buffer_',
+ /// truncating them if necessary.
+ virtual void MaterializeValues() override;
+
+ virtual void ToThrift(TMinMaxFilter* thrift) const override;
+ virtual std::string DebugString() const override;
+
+ static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+ /// Struct name in LLVM IR.
+ static const char* LLVM_CLASS_NAME;
+
+ private:
+ /// Copies the contents of 'value' into 'buffer', up to 'len', and reassignes 'value' to
+ /// point to 'buffer'. If an oom is hit, disables the filter by setting 'always_true_'
+ /// to true.
+ void CopyToBuffer(StringBuffer* buffer, StringValue* value, int64_t len);
+
+ /// Sets 'always_true_' to true and clears the values of 'min_', 'max_', 'min_buffer_',
+ /// and 'max_buffer_'.
+ void SetAlwaysTrue();
+
+ /// The maximum length of string to store in 'min_str_' or 'max_str_'. Strings inserted
+ /// into this filter that are longer than this will be truncated.
+ static const int MAX_BOUND_LENGTH;
+
+ /// The min/max values. After a call to MaterializeValues() these will point to
+ /// 'min_buffer_'/'max_buffer_'.
+ StringValue min_;
+ StringValue max_;
+
+ /// Local buffers to copy min/max data into. If Insert() was called and 'min_'/'max_'
+ /// was updated, these will be empty until MaterializeValues() is called.
+ StringBuffer min_buffer_;
+ StringBuffer max_buffer_;
+
+ /// True if no rows have been inserted.
+ bool always_false_;
+ bool always_true_;
+};
+
+class TimestampMinMaxFilter : public MinMaxFilter {
+ public:
+ TimestampMinMaxFilter() { always_false_ = true; }
+ TimestampMinMaxFilter(const TMinMaxFilter& thrift);
+ virtual ~TimestampMinMaxFilter() {}
+
+ virtual void* GetMin() override { return &min_; }
+ virtual void* GetMax() override { return &max_; }
+ virtual PrimitiveType type() override;
+
+ virtual void Insert(void* val) override;
+ virtual bool AlwaysTrue() const override { return false; }
+ virtual bool AlwaysFalse() const override { return always_false_; }
+ virtual void ToThrift(TMinMaxFilter* thrift) const override;
+ virtual std::string DebugString() const override;
+
+ static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+ /// Struct name in LLVM IR.
+ static const char* LLVM_CLASS_NAME;
+
+ private:
+ TimestampValue min_;
+ TimestampValue max_;
+
+ /// True if no rows have been inserted.
+ bool always_false_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/Data.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Data.thrift b/common/thrift/Data.thrift
index 61d1988..6361e7e 100644
--- a/common/thrift/Data.thrift
+++ b/common/thrift/Data.thrift
@@ -28,6 +28,7 @@ struct TColumnValue {
4: optional double double_val
5: optional string string_val
8: optional binary binary_val
+ 9: optional binary timestamp_val
}
struct TResultRow {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1f261f4..1fd9c25 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -33,6 +33,7 @@ include "DataSinks.thrift"
include "Results.thrift"
include "RuntimeProfile.thrift"
include "ImpalaService.thrift"
+include "Data.thrift"
// constants for TQueryOptions.num_nodes
const i32 NUM_NODES_ALL = 0
@@ -191,14 +192,14 @@ struct TQueryOptions {
// be rounded up to the nearest power of two.
38: optional i32 runtime_bloom_filter_size = 1048576
- // Time in ms to wait until partition filters are delivered. If 0, the default defined
+ // Time in ms to wait until runtime filters are delivered. If 0, the default defined
// by the startup flag of the same name is used.
39: optional i32 runtime_filter_wait_time_ms = 0
// If true, per-row runtime filtering is disabled
40: optional bool disable_row_runtime_filtering = false
- // Maximum number of runtime filters allowed per query
+ // Maximum number of bloom runtime filters allowed per query
41: optional i32 max_num_runtime_filters = 10
// If true, use UTF-8 annotation for string columns. Note that char and varchar columns
@@ -226,10 +227,10 @@ struct TQueryOptions {
// the files there.
45: optional bool s3_skip_insert_staging = true
- // Minimum runtime filter size, in bytes
+ // Minimum runtime bloom filter size, in bytes
46: optional i32 runtime_filter_min_size = 1048576
- // Maximum runtime filter size, in bytes
+ // Maximum runtime bloom filter size, in bytes
47: optional i32 runtime_filter_max_size = 16777216
// Prefetching behavior during hash tables' building and probing.
@@ -771,6 +772,16 @@ struct TBloomFilter {
4: required bool always_false
}
+struct TMinMaxFilter {
+ // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+ 1: required bool always_true
+
+ // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+ 2: required bool always_false
+
+ 3: optional Data.TColumnValue min
+ 4: optional Data.TColumnValue max
+}
// UpdateFilter
@@ -787,6 +798,8 @@ struct TUpdateFilterParams {
// required in V1
4: optional TBloomFilter bloom_filter
+
+ 5: optional TMinMaxFilter min_max_filter
}
struct TUpdateFilterResult {
@@ -812,6 +825,8 @@ struct TPublishFilterParams {
// Actual bloom_filter payload
// required in V1
5: optional TBloomFilter bloom_filter
+
+ 6: optional TMinMaxFilter min_max_filter
}
struct TPublishFilterResult {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index b8a073e..061da00 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -199,13 +199,13 @@ enum TImpalaQueryOptions {
// two.
RUNTIME_BLOOM_FILTER_SIZE,
- // Time (in ms) to wait in scans for partition filters to arrive.
+ // Time (in ms) to wait in scans for runtime filters to arrive.
RUNTIME_FILTER_WAIT_TIME_MS,
// If true, disable application of runtime filters to individual rows.
DISABLE_ROW_RUNTIME_FILTERING,
- // Maximum number of runtime filters allowed per query.
+ // Maximum number of bloom runtime filters allowed per query.
MAX_NUM_RUNTIME_FILTERS,
// If true, use UTF-8 annotation for string columns. Note that char and varchar columns
@@ -227,10 +227,10 @@ enum TImpalaQueryOptions {
// TODO: Find a way to get this working for INSERT OVERWRITEs too.
S3_SKIP_INSERT_STAGING,
- // Maximum runtime filter size, in bytes.
+ // Maximum runtime bloom filter size, in bytes.
RUNTIME_FILTER_MAX_SIZE,
- // Minimum runtime filter size, in bytes.
+ // Minimum runtime bloom filter size, in bytes.
RUNTIME_FILTER_MIN_SIZE,
// Prefetching behavior during hash tables' building and probing.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c04d08a..97ef1b3 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -98,6 +98,16 @@ struct TRuntimeFilterTargetDesc {
// Indicates if this target is on the same fragment as the join that
// produced the runtime filter
5: required bool is_local_target
+
+ // If the target node is a Kudu scan node, the name, in the case it appears in Kudu, and
+ // type of the targeted column.
+ 6: optional string kudu_col_name
+ 7: optional Types.TColumnType kudu_col_type;
+}
+
+enum TRuntimeFilterType {
+ BLOOM,
+ MIN_MAX
}
// Specification of a runtime filter.
@@ -132,6 +142,9 @@ struct TRuntimeFilterDesc {
// The estimated number of distinct values that the planner expects the filter to hold.
// Used to compute the size of the filter.
9: optional i64 ndv_estimate
+
+ // The type of runtime filter to build.
+ 10: required TRuntimeFilterType type
}
// The information contained in subclasses of ScanNode captured in two separate
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 917d918..d04a15e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -189,7 +189,7 @@ public class HashJoinNode extends JoinNode {
}
if (!runtimeFilters_.isEmpty()) {
output.append(detailPrefix + "runtime filters: ");
- output.append(getRuntimeFilterExplainString(true));
+ output.append(getRuntimeFilterExplainString(true, detailLevel));
}
}
return output.toString();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ad8501a..44f58eb 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -978,7 +978,7 @@ public class HdfsScanNode extends ScanNode {
}
if (!runtimeFilters_.isEmpty()) {
output.append(detailPrefix + "runtime filters: ");
- output.append(getRuntimeFilterExplainString(false));
+ output.append(getRuntimeFilterExplainString(false, detailLevel));
}
}
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index cbc132b..390592e 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -295,6 +295,10 @@ public class KuduScanNode extends ScanNode {
result.append(detailPrefix + "kudu predicates: " + getExplainString(
kuduConjuncts_) + "\n");
}
+ if (!runtimeFilters_.isEmpty()) {
+ result.append(detailPrefix + "runtime filters: ");
+ result.append(getRuntimeFilterExplainString(false, detailLevel));
+ }
}
}
return result.toString();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index c149b5c..a14c89a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -705,25 +705,28 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; }
- protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+ protected String getRuntimeFilterExplainString(
+ boolean isBuildNode, TExplainLevel detailLevel) {
if (runtimeFilters_.isEmpty()) return "";
- final String applyNodeFilterFormat = "%s -> %s";
- final String buildNodeFilterFormat = "%s <- %s";
- String format = isBuildNode ? buildNodeFilterFormat : applyNodeFilterFormat;
- StringBuilder output = new StringBuilder();
List<String> filtersStr = Lists.newArrayList();
for (RuntimeFilter filter: runtimeFilters_) {
- Expr expr = null;
+ StringBuilder filterStr = new StringBuilder();
+ filterStr.append(filter.getFilterId());
+ if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+ filterStr.append("[");
+ filterStr.append(filter.getType().toString().toLowerCase());
+ filterStr.append("]");
+ }
if (isBuildNode) {
- expr = filter.getSrcExpr();
+ filterStr.append(" <- ");
+ filterStr.append(filter.getSrcExpr().toSql());
} else {
- expr = filter.getTargetExpr(getId());
+ filterStr.append(" -> ");
+ filterStr.append(filter.getTargetExpr(getId()).toSql());
}
- Preconditions.checkNotNull(expr);
- filtersStr.add(String.format(format, filter.getFilterId(), expr.toSql()));
+ filtersStr.add(filterStr.toString());
}
- output.append(Joiner.on(", ").join(filtersStr) + "\n");
- return output.toString();
+ return Joiner.on(", ").join(filtersStr) + "\n";
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 98365e6..758e79d 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -26,6 +26,8 @@ import java.util.Set;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BinaryPredicate.Operator;
+import org.apache.impala.analysis.CastExpr;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.Predicate;
@@ -35,6 +37,7 @@ import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.analysis.TupleIsNullPredicate;
+import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
@@ -44,6 +47,7 @@ import org.apache.impala.planner.PlanNode;
import org.apache.impala.thrift.TRuntimeFilterDesc;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
+import org.apache.impala.thrift.TRuntimeFilterType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -107,6 +111,8 @@ public final class RuntimeFilterGenerator {
private final Expr srcExpr_;
// Expr (lhs of join predicate) from which the targetExprs_ are generated.
private final Expr origTargetExpr_;
+ // The operator comparing 'srcExpr_' and 'origTargetExpr_'.
+ private final Operator exprCmpOp_;
// Runtime filter targets
private final List<RuntimeFilterTarget> targets_ = Lists.newArrayList();
// Slots from base table tuples that have value transfer from the slots
@@ -131,6 +137,8 @@ public final class RuntimeFilterGenerator {
// If set, indicates that the filter can't be assigned to another scan node.
// Once set, it can't be unset.
private boolean finalized_ = false;
+ // The type of filter to build.
+ private TRuntimeFilterType type_;
/**
* Internal representation of a runtime filter target.
@@ -165,6 +173,14 @@ public final class RuntimeFilterGenerator {
tFilterTarget.setTarget_expr_slotids(tSlotIds);
tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
tFilterTarget.setIs_local_target(isLocalTarget);
+ if (node instanceof KuduScanNode) {
+ // assignRuntimeFilters() only assigns KuduScanNode targets if the target expr
+ // is a slot ref, possibly with an implicit cast, pointing to a column.
+ SlotRef slotRef = expr.unwrapSlotRef(true);
+ KuduColumn col = (KuduColumn) slotRef.getDesc().getColumn();
+ tFilterTarget.setKudu_col_name(col.getKuduName());
+ tFilterTarget.setKudu_col_type(col.getType().toThrift());
+ }
return tFilterTarget;
}
@@ -179,13 +195,16 @@ public final class RuntimeFilterGenerator {
}
}
- private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode,
- Expr srcExpr, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots) {
+ private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
+ Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
+ TRuntimeFilterType type) {
id_ = filterId;
src_ = filterSrcNode;
srcExpr_ = srcExpr;
origTargetExpr_ = origTargetExpr;
+ exprCmpOp_ = exprCmpOp;
targetSlotsByTid_ = targetSlots;
+ type_ = type;
computeNdvEstimate();
}
@@ -221,6 +240,7 @@ public final class RuntimeFilterGenerator {
appliedOnPartitionColumns && target.isBoundByPartitionColumns;
}
tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
+ tFilter.setType(type_);
return tFilter;
}
@@ -230,7 +250,8 @@ public final class RuntimeFilterGenerator {
* or null if a runtime filter cannot be generated from the specified predicate.
*/
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
- Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode) {
+ Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
+ TRuntimeFilterType type) {
Preconditions.checkNotNull(idGen);
Preconditions.checkNotNull(joinPredicate);
Preconditions.checkNotNull(filterSrcNode);
@@ -256,8 +277,8 @@ public final class RuntimeFilterGenerator {
if (LOG.isTraceEnabled()) {
LOG.trace("Generating runtime filter from predicate " + joinPredicate);
}
- return new RuntimeFilter(idGen.getNextId(), filterSrcNode,
- srcExpr, targetExpr, targetSlots);
+ return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
+ normalizedJoinConjunct.getOp(), targetSlots, type);
}
/**
@@ -337,6 +358,8 @@ public final class RuntimeFilterGenerator {
public Expr getOrigTargetExpr() { return origTargetExpr_; }
public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid_; }
public RuntimeFilterId getFilterId() { return id_; }
+ public TRuntimeFilterType getType() { return type_; }
+ public Operator getExprCompOp() { return exprCmpOp_; }
/**
* Estimates the selectivity of a runtime filter as the cardinality of the
@@ -394,14 +417,14 @@ public final class RuntimeFilterGenerator {
public static void generateRuntimeFilters(PlannerContext ctx, PlanNode plan) {
Preconditions.checkNotNull(ctx);
Preconditions.checkNotNull(ctx.getQueryOptions());
- int maxNumFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
- Preconditions.checkState(maxNumFilters >= 0);
+ int maxNumBloomFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
+ Preconditions.checkState(maxNumBloomFilters >= 0);
RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
filterGenerator.generateFilters(ctx, plan);
List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
- if (filters.size() > maxNumFilters) {
- // If more than 'maxNumFilters' were generated, sort them by increasing selectivity
- // and keep the 'maxNumFilters' most selective.
+ if (filters.size() > maxNumBloomFilters) {
+ // If more than 'maxNumBloomFilters' were generated, sort them by increasing
+ // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters.
Collections.sort(filters, new Comparator<RuntimeFilter>() {
public int compare(RuntimeFilter a, RuntimeFilter b) {
double aSelectivity =
@@ -413,8 +436,14 @@ public final class RuntimeFilterGenerator {
}
);
}
- for (RuntimeFilter filter:
- filters.subList(0, Math.min(filters.size(), maxNumFilters))) {
+ // We only enforce a limit on the number of bloom filters as they are much more
+ // heavy-weight than the other filter types.
+ int numBloomFilters = 0;
+ for (RuntimeFilter filter : filters) {
+ if (filter.getType() == TRuntimeFilterType.BLOOM) {
+ if (numBloomFilters >= maxNumBloomFilters) continue;
+ ++numBloomFilters;
+ }
filter.setIsBroadcast(
filter.src_.getDistributionMode() == DistributionMode.BROADCAST);
filter.computeHasLocalTargets();
@@ -462,12 +491,14 @@ public final class RuntimeFilterGenerator {
}
joinConjuncts.addAll(joinNode.getConjuncts());
List<RuntimeFilter> filters = Lists.newArrayList();
- for (Expr conjunct: joinConjuncts) {
- RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
- ctx.getRootAnalyzer(), conjunct, joinNode);
- if (filter == null) continue;
- registerRuntimeFilter(filter);
- filters.add(filter);
+ for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
+ for (Expr conjunct : joinConjuncts) {
+ RuntimeFilter filter = RuntimeFilter.create(
+ filterIdGenerator, ctx.getRootAnalyzer(), conjunct, joinNode, type);
+ if (filter == null) continue;
+ registerRuntimeFilter(filter);
+ filters.add(filter);
+ }
}
generateFilters(ctx, root.getChild(0));
// Finalize every runtime filter of that join. This is to ensure that we don't
@@ -538,11 +569,14 @@ public final class RuntimeFilterGenerator {
* 2. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned
* to 'scanNode' if the filter is produced within the same fragment that contains the
* scan node.
+ * 3. Only Hdfs and Kudu scan nodes are supported:
+ * a. If the target is an HdfsScanNode, the filter must be type BLOOM.
+ * b. If the target is a KuduScanNode, the filter must be type MIN_MAX, the target
+ * must be a slot ref on a column, and the comp op cannot be 'not distinct'.
* A scan node may be used as a destination node for multiple runtime filters.
- * Currently, runtime filters can only be assigned to HdfsScanNodes.
*/
private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
- if (!(scanNode instanceof HdfsScanNode)) return;
+ if (!(scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode)) return;
TupleId tid = scanNode.getTupleIds().get(0);
if (!runtimeFiltersByTid_.containsKey(tid)) return;
Analyzer analyzer = ctx.getRootAnalyzer();
@@ -558,6 +592,26 @@ public final class RuntimeFilterGenerator {
if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
boolean isLocalTarget = isLocalTarget(filter, scanNode);
if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;
+
+ // Check that the scan node supports applying filters of this type and targetExpr.
+ if (scanNode instanceof HdfsScanNode
+ && filter.getType() != TRuntimeFilterType.BLOOM) {
+ continue;
+ } else if (scanNode instanceof KuduScanNode) {
+ if (filter.getType() != TRuntimeFilterType.MIN_MAX) continue;
+ SlotRef slotRef = targetExpr.unwrapSlotRef(true);
+ // Kudu only supports targeting a single column, not general exprs, so the target
+ // must be a SlotRef pointing to a column. We can allow implicit integer casts
+ // by casting the min/max values before sending them to Kudu.
+ // Kudu also cannot currently return nulls if a filter is applied, so it does not
+ // work with "is not distinct".
+ if (slotRef == null || slotRef.getDesc().getColumn() == null
+ || (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
+ || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
+ continue;
+ }
+ }
+
RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget);
filter.addTarget(target);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index f49e39c..760334d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -497,4 +497,11 @@ public class PlannerTest extends PlannerTestBase {
requestWithDisableSpillOn = frontend_.createExecRequest(queryCtx, explainBuilder);
Assert.assertNotNull(requestWithDisableSpillOn);
}
+
+ @Test
+ public void testMinMaxRuntimeFilters() {
+ TQueryOptions options = defaultQueryOptions();
+ options.setExplain_level(TExplainLevel.EXTENDED);
+ runPlannerTestFile("min-max-runtime-filters", options);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 15db74d..cf5a78b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -1245,7 +1245,7 @@ PLAN-ROOT SINK
|
03:HASH JOIN [INNER JOIN, PARTITIONED]
| hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk
-| runtime filters: RF002 <- o_orderkey, RF003 <- o_clerk
+| runtime filters: RF004 <- o_orderkey, RF005 <- o_clerk
|
|--07:EXCHANGE [HASH(o_orderkey,o_clerk)]
| |
@@ -1257,7 +1257,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpch_parquet.lineitem]
partitions=1/1 files=3 size=193.92MB
- runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag
+ runtime filters: RF004 -> l_orderkey, RF005 -> l_returnflag
====
# IMPALA-4263: Grouping agg needs a merge step because the grouping exprs reference a
# tuple that is made nullable in the join fragment.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
index 9dc9f22..5571b21 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
@@ -12,7 +12,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_customer_sk = c_customer_sk
| fk/pk conjuncts: ss_customer_sk = c_customer_sk
-| runtime filters: RF000 <- c_customer_sk
+| runtime filters: RF000[bloom] <- c_customer_sk
| mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB
| tuple-ids=0,1 row-size=355B cardinality=529700
|
@@ -28,7 +28,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_customer_sk
+ runtime filters: RF000[bloom] -> ss_customer_sk
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -87,7 +87,7 @@ PLAN-ROOT SINK
02:HASH JOIN [RIGHT OUTER JOIN]
| hash predicates: ss_customer_sk = c_customer_sk
| fk/pk conjuncts: ss_customer_sk = c_customer_sk
-| runtime filters: RF000 <- c_customer_sk
+| runtime filters: RF000[bloom] <- c_customer_sk
| mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB
| tuple-ids=0N,1 row-size=355B cardinality=529700
|
@@ -103,7 +103,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_customer_sk
+ runtime filters: RF000[bloom] -> ss_customer_sk
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -124,7 +124,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
| fk/pk conjuncts: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
-| runtime filters: RF000 <- sr_item_sk, RF001 <- sr_ticket_number
+| runtime filters: RF000[bloom] <- sr_item_sk, RF001[bloom] <- sr_ticket_number
| mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
| tuple-ids=0,1 row-size=188B cardinality=211838
|
@@ -140,7 +140,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_item_sk, RF001 -> ss_ticket_number
+ runtime filters: RF000[bloom] -> ss_item_sk, RF001[bloom] -> ss_ticket_number
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -160,7 +160,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_sold_time_sk = ws_sold_time_sk
| fk/pk conjuncts: none
-| runtime filters: RF000 <- ws_sold_time_sk
+| runtime filters: RF000[bloom] <- ws_sold_time_sk
| mem-estimate=108.67MB mem-reservation=34.00MB spill-buffer=2.00MB
| tuple-ids=0,1 row-size=244B cardinality=44136418
|
@@ -174,7 +174,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_sold_time_sk
+ runtime filters: RF000[bloom] -> ss_sold_time_sk
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -195,7 +195,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: b.d_date_sk = a.d_date_sk
| fk/pk conjuncts: b.d_date_sk = a.d_date_sk
-| runtime filters: RF000 <- a.d_date_sk
+| runtime filters: RF000[bloom] <- a.d_date_sk
| mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB
| tuple-ids=1,0 row-size=606B cardinality=36525
|
@@ -211,7 +211,7 @@ PLAN-ROOT SINK
|
01:SCAN HDFS [tpcds.date_dim b]
partitions=1/1 files=1 size=9.84MB
- runtime filters: RF000 -> b.d_date_sk
+ runtime filters: RF000[bloom] -> b.d_date_sk
stats-rows=73049 extrapolated-rows=disabled
table stats: rows=73049 size=9.84MB
column stats: all
@@ -236,7 +236,7 @@ PLAN-ROOT SINK
08:HASH JOIN [INNER JOIN]
| hash predicates: ss_addr_sk = c_current_addr_sk
| fk/pk conjuncts: none
-| runtime filters: RF000 <- c_current_addr_sk
+| runtime filters: RF000[bloom] <- c_current_addr_sk
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=1,0,3,4,2 row-size=60B cardinality=19358
|
@@ -251,7 +251,7 @@ PLAN-ROOT SINK
07:HASH JOIN [INNER JOIN]
| hash predicates: sr_returned_date_sk = d2.d_date_sk
| fk/pk conjuncts: sr_returned_date_sk = d2.d_date_sk
-| runtime filters: RF001 <- d2.d_date_sk
+| runtime filters: RF002[bloom] <- d2.d_date_sk
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=1,0,3,4 row-size=56B cardinality=8131
|
@@ -266,14 +266,14 @@ PLAN-ROOT SINK
06:HASH JOIN [INNER JOIN]
| hash predicates: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
| fk/pk conjuncts: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
-| runtime filters: RF002 <- ss_item_sk, RF003 <- ss_ticket_number
+| runtime filters: RF004[bloom] <- ss_item_sk, RF005[bloom] <- ss_ticket_number
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=1,0,3 row-size=52B cardinality=8131
|
|--05:HASH JOIN [INNER JOIN]
| | hash predicates: ss_sold_date_sk = d1.d_date_sk
| | fk/pk conjuncts: ss_sold_date_sk = d1.d_date_sk
-| | runtime filters: RF004 <- d1.d_date_sk
+| | runtime filters: RF008[bloom] <- d1.d_date_sk
| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| | tuple-ids=0,3 row-size=32B cardinality=11055
| |
@@ -289,7 +289,7 @@ PLAN-ROOT SINK
| |
| 00:SCAN HDFS [tpcds.store_sales]
| partitions=1824/1824 files=1824 size=326.32MB
-| runtime filters: RF000 -> ss_addr_sk, RF004 -> ss_sold_date_sk
+| runtime filters: RF000[bloom] -> ss_addr_sk, RF008[bloom] -> ss_sold_date_sk
| stats-rows=2880404 extrapolated-rows=disabled
| table stats: rows=2880404 size=326.32MB
| column stats: all
@@ -298,7 +298,7 @@ PLAN-ROOT SINK
|
01:SCAN HDFS [tpcds.store_returns]
partitions=1/1 files=1 size=31.19MB
- runtime filters: RF001 -> sr_returned_date_sk, RF002 -> sr_item_sk, RF003 -> sr_ticket_number
+ runtime filters: RF002[bloom] -> sr_returned_date_sk, RF004[bloom] -> sr_item_sk, RF005[bloom] -> sr_ticket_number
stats-rows=287514 extrapolated-rows=disabled
table stats: rows=287514 size=31.19MB
column stats: all
@@ -318,7 +318,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_customer_sk % 10 = c_customer_sk / 100
| fk/pk conjuncts: assumed fk/pk
-| runtime filters: RF000 <- c_customer_sk / 100
+| runtime filters: RF000[bloom] <- c_customer_sk / 100
| mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB
| tuple-ids=0,1 row-size=355B cardinality=2880404
|
@@ -332,7 +332,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_customer_sk % 10
+ runtime filters: RF000[bloom] -> ss_customer_sk % 10
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -353,7 +353,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_customer_sk = c_customer_sk
| fk/pk conjuncts: assumed fk/pk
-| runtime filters: RF000 <- c_customer_sk
+| runtime filters: RF000[bloom] <- c_customer_sk
| mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB
| tuple-ids=0,1 row-size=8B cardinality=2880404
|
@@ -367,7 +367,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_customer_sk
+ runtime filters: RF000[bloom] -> ss_customer_sk
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
@@ -387,7 +387,7 @@ PLAN-ROOT SINK
02:HASH JOIN [INNER JOIN]
| hash predicates: ss_customer_sk = c_customer_sk
| fk/pk conjuncts: assumed fk/pk
-| runtime filters: RF000 <- c_customer_sk
+| runtime filters: RF000[bloom] <- c_customer_sk
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=0,1 row-size=8B cardinality=unavailable
|
@@ -401,7 +401,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds_seq_snap.store_sales]
partitions=1824/1824 files=1824 size=207.85MB
- runtime filters: RF000 -> ss_customer_sk
+ runtime filters: RF000[bloom] -> ss_customer_sk
stats-rows=unavailable extrapolated-rows=disabled
table stats: rows=unavailable size=unavailable
column stats: unavailable
@@ -423,7 +423,7 @@ PLAN-ROOT SINK
03:HASH JOIN [INNER JOIN]
| hash predicates: ss_sold_time_sk = ws_sold_time_sk
| fk/pk conjuncts: none
-| runtime filters: RF000 <- ws_sold_time_sk
+| runtime filters: RF000[bloom] <- ws_sold_time_sk
| mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
| tuple-ids=0,2 row-size=104B cardinality=2440073
|
@@ -442,7 +442,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [tpcds.store_sales]
partitions=1824/1824 files=1824 size=326.32MB
- runtime filters: RF000 -> ss_sold_time_sk
+ runtime filters: RF000[bloom] -> ss_sold_time_sk
stats-rows=2880404 extrapolated-rows=disabled
table stats: rows=2880404 size=326.32MB
column stats: all
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
index e5aa5e8..184fd1c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
@@ -193,14 +193,14 @@ PLAN-ROOT SINK
| |
| |--04:HASH JOIN [INNER JOIN]
| | | hash predicates: b.id = c.id
-| | | runtime filters: RF001 <- c.id
+| | | runtime filters: RF002 <- c.id
| | |
| | |--02:SCAN HDFS [functional.alltypestiny c]
| | | partitions=4/4 files=4 size=460B
| | |
| | 01:SCAN HDFS [functional.alltypes b]
| | partitions=24/24 files=24 size=478.45KB
-| | runtime filters: RF001 -> b.id
+| | runtime filters: RF002 -> b.id
| |
| 00:SCAN HDFS [functional.alltypestiny a]
| partitions=4/4 files=4 size=460B
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
index 556ba65..c75c02d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
@@ -391,7 +391,7 @@ PLAN-ROOT SINK
|
|--03:HASH JOIN [INNER JOIN]
| | hash predicates: a.id = functional.alltypestiny.id
-| | runtime filters: RF001 <- functional.alltypestiny.id
+| | runtime filters: RF002 <- functional.alltypestiny.id
| | limit: 10
| |
| |--02:SCAN HDFS [functional.alltypestiny]
@@ -399,7 +399,7 @@ PLAN-ROOT SINK
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
-| runtime filters: RF001 -> a.id
+| runtime filters: RF002 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
@@ -426,7 +426,7 @@ PLAN-ROOT SINK
| |
| 03:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: a.id = functional.alltypestiny.id
-| | runtime filters: RF001 <- functional.alltypestiny.id
+| | runtime filters: RF002 <- functional.alltypestiny.id
| | limit: 10
| |
| |--06:EXCHANGE [BROADCAST]
@@ -436,7 +436,7 @@ PLAN-ROOT SINK
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
-| runtime filters: RF001 -> a.id
+| runtime filters: RF002 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
@@ -470,14 +470,14 @@ PLAN-ROOT SINK
| |
| 03:HASH JOIN [INNER JOIN]
| | hash predicates: a.id = functional.alltypestiny.id
-| | runtime filters: RF001 <- functional.alltypestiny.id
+| | runtime filters: RF002 <- functional.alltypestiny.id
| |
| |--02:SCAN HDFS [functional.alltypestiny]
| | partitions=4/4 files=4 size=460B
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
-| runtime filters: RF001 -> a.id
+| runtime filters: RF002 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
@@ -508,7 +508,7 @@ PLAN-ROOT SINK
| |
| 03:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: a.id = functional.alltypestiny.id
-| | runtime filters: RF001 <- functional.alltypestiny.id
+| | runtime filters: RF002 <- functional.alltypestiny.id
| |
| |--07:EXCHANGE [BROADCAST]
| | |
@@ -517,7 +517,7 @@ PLAN-ROOT SINK
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
-| runtime filters: RF001 -> a.id
+| runtime filters: RF002 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB