You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:49 UTC

[06/16] incubator-impala git commit: IMPALA-4252: Min-max runtime filters for Kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter-ir.cc b/be/src/util/min-max-filter-ir.cc
new file mode 100644
index 0000000..130d11d
--- /dev/null
+++ b/be/src/util/min-max-filter-ir.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/min-max-filter.h"
+
+#include "runtime/string-value.inline.h"
+
+using std::string;
+
+namespace impala {
+
+#define NUMERIC_MIN_MAX_FILTER_INSERT(NAME, TYPE) \
+  void NAME##MinMaxFilter::Insert(void* val) {    \
+    if (val == nullptr) return;                   \
+    TYPE* value = reinterpret_cast<TYPE*>(val);   \
+    if (*value < min_) min_ = *value;             \
+    if (*value > max_) max_ = *value;             \
+  }
+
+NUMERIC_MIN_MAX_FILTER_INSERT(Bool, bool);
+NUMERIC_MIN_MAX_FILTER_INSERT(TinyInt, int8_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(SmallInt, int16_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(Int, int32_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(BigInt, int64_t);
+NUMERIC_MIN_MAX_FILTER_INSERT(Float, float);
+NUMERIC_MIN_MAX_FILTER_INSERT(Double, double);
+
+void StringMinMaxFilter::Insert(void* val) {
+  if (val == nullptr || always_true_) return;
+  const StringValue* value = reinterpret_cast<const StringValue*>(val);
+  if (always_false_) {
+    min_ = *value;
+    max_ = *value;
+    always_false_ = false;
+  } else {
+    if (*value < min_) {
+      min_ = *value;
+      min_buffer_.Clear();
+    } else if (*value > max_) {
+      max_ = *value;
+      max_buffer_.Clear();
+    }
+  }
+}
+
+void TimestampMinMaxFilter::Insert(void* val) {
+  if (val == nullptr) return;
+  const TimestampValue* value = reinterpret_cast<const TimestampValue*>(val);
+  if (always_false_) {
+    min_ = *value;
+    max_ = *value;
+    always_false_ = false;
+  } else {
+    if (*value < min_) {
+      min_ = *value;
+    } else if (*value > max_) {
+      max_ = *value;
+    }
+  }
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
new file mode 100644
index 0000000..23712e6
--- /dev/null
+++ b/be/src/util/min-max-filter-test.cc
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+#include "util/min-max-filter.h"
+
+#include "runtime/string-value.inline.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
+#include "util/test-info.h"
+
+DECLARE_bool(enable_webserver);
+
+using namespace impala;
+
+// Tests that a BoolMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for bools.
+TEST(MinMaxFilterTest, TestBoolMinMaxFilter) {
+  MemTracker mem_tracker;
+  MemPool mem_pool(&mem_tracker);
+  ObjectPool obj_pool;
+
+  MinMaxFilter* filter =
+      MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &obj_pool, &mem_pool);
+  EXPECT_TRUE(filter->AlwaysFalse());
+  bool b1 = true;
+  filter->Insert(&b1);
+  EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMin()), b1);
+  EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
+  EXPECT_FALSE(filter->AlwaysFalse());
+
+  bool b2 = false;
+  filter->Insert(&b2);
+  EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMin()), b2);
+  EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
+
+  // Check the behavior of Or.
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_bool_val(false);
+  tFilter1.max.__set_bool_val(true);
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_bool_val(false);
+  tFilter2.max.__set_bool_val(false);
+  MinMaxFilter::Or(tFilter1, &tFilter2);
+  EXPECT_FALSE(tFilter2.min.bool_val);
+  EXPECT_TRUE(tFilter2.max.bool_val);
+}
+
+void CheckIntVals(MinMaxFilter* filter, int32_t min, int32_t max) {
+  EXPECT_EQ(*reinterpret_cast<int32_t*>(filter->GetMin()), min);
+  EXPECT_EQ(*reinterpret_cast<int32_t*>(filter->GetMax()), max);
+  EXPECT_FALSE(filter->AlwaysFalse());
+  EXPECT_FALSE(filter->AlwaysTrue());
+}
+
+// Tests that a IntMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for ints.
+// This also provides coverage for the other numeric MinMaxFilter types as they're
+// generated with maxcros and the logic is identical.
+TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
+  MemTracker mem_tracker;
+  MemPool mem_pool(&mem_tracker);
+  ObjectPool obj_pool;
+
+  ColumnType int_type(PrimitiveType::TYPE_INT);
+  MinMaxFilter* int_filter = MinMaxFilter::Create(int_type, &obj_pool, &mem_pool);
+
+  // Test the behavior of an empty filter.
+  EXPECT_TRUE(int_filter->AlwaysFalse());
+  EXPECT_FALSE(int_filter->AlwaysTrue());
+  TMinMaxFilter tFilter;
+  int_filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_FALSE(tFilter.min.__isset.int_val);
+  EXPECT_FALSE(tFilter.max.__isset.int_val);
+  MinMaxFilter* empty_filter =
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+  EXPECT_TRUE(empty_filter->AlwaysFalse());
+  EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+  // Now insert some stuff.
+  int32_t i1 = 10;
+  int_filter->Insert(&i1);
+  CheckIntVals(int_filter, i1, i1);
+  int32_t i2 = 15;
+  int_filter->Insert(&i2);
+  CheckIntVals(int_filter, i1, i2);
+  int32_t i3 = 12;
+  int_filter->Insert(&i3);
+  CheckIntVals(int_filter, i1, i2);
+  int32_t i4 = 8;
+  int_filter->Insert(&i4);
+  CheckIntVals(int_filter, i4, i2);
+
+  int_filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.int_val, i4);
+  EXPECT_EQ(tFilter.max.int_val, i2);
+  MinMaxFilter* int_filter2 =
+      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_pool);
+  CheckIntVals(int_filter2, i4, i2);
+
+  // Check the behavior of Or.
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_int_val(4);
+  tFilter1.max.__set_int_val(8);
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_int_val(2);
+  tFilter2.max.__set_int_val(7);
+  MinMaxFilter::Or(tFilter1, &tFilter2);
+  EXPECT_EQ(tFilter2.min.int_val, 2);
+  EXPECT_EQ(tFilter2.max.int_val, 8);
+}
+
+void CheckStringVals(MinMaxFilter* filter, const string& min, const string& max) {
+  StringValue actual_min = *reinterpret_cast<StringValue*>(filter->GetMin());
+  StringValue actual_max = *reinterpret_cast<StringValue*>(filter->GetMax());
+  StringValue expected_min(min);
+  StringValue expected_max(max);
+  EXPECT_EQ(actual_min, expected_min);
+  EXPECT_EQ(actual_max, expected_max);
+  EXPECT_FALSE(filter->AlwaysTrue());
+  EXPECT_FALSE(filter->AlwaysFalse());
+}
+
+// Tests that a StringMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for strings.
+// Also tests truncation behavior when inserted strings are larger than MAX_BOUND_LENTH
+// and that the filter is disabled if there's not enough mem to store the min/max.
+TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
+  ObjectPool obj_pool;
+  MemTracker mem_tracker;
+  MemPool mem_pool(&mem_tracker);
+
+  ColumnType string_type(PrimitiveType::TYPE_STRING);
+  MinMaxFilter* filter = MinMaxFilter::Create(string_type, &obj_pool, &mem_pool);
+
+  // Test the behavior of an empty filter.
+  EXPECT_TRUE(filter->AlwaysFalse());
+  EXPECT_FALSE(filter->AlwaysTrue());
+  filter->MaterializeValues();
+  EXPECT_TRUE(filter->AlwaysFalse());
+  EXPECT_FALSE(filter->AlwaysTrue());
+  TMinMaxFilter tFilter;
+  filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+
+  MinMaxFilter* empty_filter =
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+  EXPECT_TRUE(empty_filter->AlwaysFalse());
+  EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+  // Now insert some stuff.
+  string c = "c";
+  StringValue cVal(c);
+  filter->Insert(&cVal);
+  filter->MaterializeValues();
+  CheckStringVals(filter, c, c);
+
+  string d = "d";
+  StringValue dVal(d);
+  filter->Insert(&dVal);
+  filter->MaterializeValues();
+  CheckStringVals(filter, c, d);
+
+  string cc = "cc";
+  StringValue ccVal(cc);
+  filter->Insert(&ccVal);
+  filter->MaterializeValues();
+  CheckStringVals(filter, c, d);
+
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.string_val, c);
+  EXPECT_EQ(tFilter.max.string_val, d);
+
+  // Test that strings longer than 1024 are truncated.
+  string b1030(1030, 'b');
+  StringValue b1030Val(b1030);
+  filter->Insert(&b1030Val);
+  filter->MaterializeValues();
+  string b1024(1024, 'b');
+  CheckStringVals(filter, b1024, d);
+
+  string e1030(1030, 'e');
+  StringValue e1030Val(e1030);
+  filter->Insert(&e1030Val);
+  filter->MaterializeValues();
+  string e1024(1024, 'e');
+  // For max, after truncating the final char is increased by one.
+  e1024[1023] = 'f';
+  CheckStringVals(filter, b1024, e1024);
+
+  string trailMaxChar(1030, 'f');
+  int trailIndex = 1020;
+  for (int i = trailIndex; i < 1030; ++i) trailMaxChar[i] = -1;
+  StringValue trailMaxCharVal(trailMaxChar);
+  filter->Insert(&trailMaxCharVal);
+  filter->MaterializeValues();
+  // Check that when adding one for max, if the final char is the max char it overflows
+  // and carries.
+  string truncTrailMaxChar(1024, 'f');
+  truncTrailMaxChar[trailIndex - 1] = 'g';
+  for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
+  CheckStringVals(filter, b1024, truncTrailMaxChar);
+
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(tFilter.min.string_val, b1024);
+  EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
+
+  MinMaxFilter* filter2 =
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+  CheckStringVals(filter2, b1024, truncTrailMaxChar);
+
+  // Check that if the entire string is the max char and therefore after truncating for
+  // max we can't add one, the filter is disabled.
+  string allMaxChar(1030, -1);
+  StringValue allMaxCharVal(allMaxChar);
+  filter->Insert(&allMaxCharVal);
+  filter->MaterializeValues();
+  EXPECT_TRUE(filter->AlwaysTrue());
+
+  // We should still be able to insert into a disabled filter.
+  filter->Insert(&cVal);
+  EXPECT_TRUE(filter->AlwaysTrue());
+
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_TRUE(tFilter.always_true);
+
+  MinMaxFilter* always_true_filter =
+      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_pool);
+  EXPECT_FALSE(always_true_filter->AlwaysFalse());
+  EXPECT_TRUE(always_true_filter->AlwaysTrue());
+
+  mem_pool.FreeAll();
+
+  // Check that a filter that hits the mem limit is disabled.
+  MemTracker limit_mem_tracker(1);
+  MemPool limit_mem_pool(&limit_mem_tracker);
+  // We do not want to start the webserver.
+  FLAGS_enable_webserver = false;
+  std::unique_ptr<TestEnv> env;
+  env.reset(new TestEnv());
+  ASSERT_OK(env->Init());
+
+  MinMaxFilter* limit_filter =
+      MinMaxFilter::Create(string_type, &obj_pool, &limit_mem_pool);
+  EXPECT_FALSE(limit_filter->AlwaysTrue());
+  limit_filter->Insert(&cVal);
+  limit_filter->MaterializeValues();
+  EXPECT_TRUE(limit_filter->AlwaysTrue());
+  limit_filter->Insert(&dVal);
+  limit_filter->MaterializeValues();
+  EXPECT_TRUE(limit_filter->AlwaysTrue());
+
+  limit_filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_TRUE(tFilter.always_true);
+
+  // Check the behavior of Or.
+  TMinMaxFilter tFilter1;
+  tFilter1.min.__set_string_val("a");
+  tFilter1.max.__set_string_val("d");
+  TMinMaxFilter tFilter2;
+  tFilter2.min.__set_string_val("b");
+  tFilter2.max.__set_string_val("e");
+  MinMaxFilter::Or(tFilter1, &tFilter2);
+  EXPECT_EQ(tFilter2.min.string_val, "a");
+  EXPECT_EQ(tFilter2.max.string_val, "e");
+}
+
+void CheckTimestampVals(
+    MinMaxFilter* filter, const TimestampValue& min, const TimestampValue& max) {
+  EXPECT_EQ(*reinterpret_cast<TimestampValue*>(filter->GetMin()), min);
+  EXPECT_EQ(*reinterpret_cast<TimestampValue*>(filter->GetMax()), max);
+  EXPECT_FALSE(filter->AlwaysFalse());
+  EXPECT_FALSE(filter->AlwaysTrue());
+}
+
+// Tests that a TimestampMinMaxFilter returns the expected min/max after having values
+// inserted into it, and that MinMaxFilter::Or works for timestamps.
+TEST(MinMaxFilterTest, TestTimestampMinMaxFilter) {
+  ObjectPool obj_pool;
+  MemTracker mem_tracker;
+  MemPool mem_pool(&mem_tracker);
+  ColumnType timestamp_type(PrimitiveType::TYPE_TIMESTAMP);
+  MinMaxFilter* filter = MinMaxFilter::Create(timestamp_type, &obj_pool, &mem_pool);
+
+  // Test the behavior of an empty filter.
+  EXPECT_TRUE(filter->AlwaysFalse());
+  EXPECT_FALSE(filter->AlwaysTrue());
+  TMinMaxFilter tFilter;
+  filter->ToThrift(&tFilter);
+  EXPECT_TRUE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
+  EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
+  MinMaxFilter* empty_filter =
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+  EXPECT_TRUE(empty_filter->AlwaysFalse());
+  EXPECT_FALSE(empty_filter->AlwaysTrue());
+
+  // Now insert some stuff.
+  TimestampValue t1 = TimestampValue::Parse("2000-01-01 00:00:00");
+  filter->Insert(&t1);
+  CheckTimestampVals(filter, t1, t1);
+  TimestampValue t2 = TimestampValue::Parse("1990-01-01 12:30:00");
+  filter->Insert(&t2);
+  CheckTimestampVals(filter, t2, t1);
+  TimestampValue t3 = TimestampValue::Parse("2001-04-30 05:00:00");
+  filter->Insert(&t3);
+  CheckTimestampVals(filter, t2, t3);
+  TimestampValue t4 = TimestampValue::Parse("2001-04-30 01:00:00");
+  filter->Insert(&t4);
+  CheckTimestampVals(filter, t2, t3);
+
+  filter->ToThrift(&tFilter);
+  EXPECT_FALSE(tFilter.always_false);
+  EXPECT_FALSE(tFilter.always_true);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
+  MinMaxFilter* filter2 =
+      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_pool);
+  CheckTimestampVals(filter2, t2, t3);
+
+  // Check the behavior of Or.
+  TMinMaxFilter tFilter1;
+  t2.ToTColumnValue(&tFilter1.min);
+  t4.ToTColumnValue(&tFilter1.max);
+  TMinMaxFilter tFilter2;
+  t1.ToTColumnValue(&tFilter2.min);
+  t3.ToTColumnValue(&tFilter2.max);
+  MinMaxFilter::Or(tFilter1, &tFilter2);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
+  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
+  InitFeSupport();
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
new file mode 100644
index 0000000..f50f896
--- /dev/null
+++ b/be/src/util/min-max-filter.cc
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/min-max-filter.h"
+
+#include <sstream>
+#include <unordered_map>
+
+#include "common/object-pool.h"
+#include "runtime/raw-value.h"
+#include "runtime/string-value.inline.h"
+#include "runtime/timestamp-value.inline.h"
+
+using std::numeric_limits;
+using std::stringstream;
+
+namespace impala {
+
+static std::unordered_map<int, string> MIN_MAX_FILTER_LLVM_CLASS_NAMES = {
+    {PrimitiveType::TYPE_BOOLEAN, BoolMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_TINYINT, TinyIntMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_SMALLINT, SmallIntMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_INT, IntMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_BIGINT, BigIntMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_FLOAT, FloatMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_DOUBLE, DoubleMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_STRING, StringMinMaxFilter::LLVM_CLASS_NAME},
+    {PrimitiveType::TYPE_TIMESTAMP, TimestampMinMaxFilter::LLVM_CLASS_NAME}};
+
+static std::unordered_map<int, IRFunction::Type> MIN_MAX_FILTER_IR_FUNCTION_TYPES = {
+    {PrimitiveType::TYPE_BOOLEAN, IRFunction::BOOL_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_TINYINT, IRFunction::TINYINT_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_SMALLINT, IRFunction::SMALLINT_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_INT, IRFunction::INT_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_BIGINT, IRFunction::BIGINT_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_FLOAT, IRFunction::FLOAT_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_DOUBLE, IRFunction::DOUBLE_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_STRING, IRFunction::STRING_MIN_MAX_FILTER_INSERT},
+    {PrimitiveType::TYPE_TIMESTAMP, IRFunction::TIMESTAMP_MIN_MAX_FILTER_INSERT}};
+
+string MinMaxFilter::GetLlvmClassName(PrimitiveType type) {
+  return MIN_MAX_FILTER_LLVM_CLASS_NAMES[type];
+}
+
+IRFunction::Type MinMaxFilter::GetInsertIRFunctionType(PrimitiveType type) {
+  return MIN_MAX_FILTER_IR_FUNCTION_TYPES[type];
+}
+
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE)  \
+  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                            \
+      "class.impala::" #NAME "MinMaxFilter";                                   \
+  NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) {        \
+    DCHECK(!thrift.always_true);                                               \
+    if (thrift.always_false) {                                                 \
+      min_ = numeric_limits<TYPE>::max();                                      \
+      max_ = numeric_limits<TYPE>::lowest();                                   \
+    } else {                                                                   \
+      DCHECK(thrift.__isset.min);                                              \
+      DCHECK(thrift.__isset.max);                                              \
+      DCHECK(thrift.min.__isset.THRIFT_TYPE##_val);                            \
+      DCHECK(thrift.max.__isset.THRIFT_TYPE##_val);                            \
+      min_ = thrift.min.THRIFT_TYPE##_val;                                     \
+      max_ = thrift.max.THRIFT_TYPE##_val;                                     \
+    }                                                                          \
+  }                                                                            \
+  PrimitiveType NAME##MinMaxFilter::type() {                                   \
+    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                               \
+  }                                                                            \
+  void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {             \
+    if (!AlwaysFalse()) {                                                      \
+      thrift->min.__set_##THRIFT_TYPE##_val(min_);                             \
+      thrift->__isset.min = true;                                              \
+      thrift->max.__set_##THRIFT_TYPE##_val(max_);                             \
+      thrift->__isset.max = true;                                              \
+    }                                                                          \
+    thrift->__set_always_false(AlwaysFalse());                                 \
+    thrift->__set_always_true(false);                                          \
+  }                                                                            \
+  string NAME##MinMaxFilter::DebugString() const {                             \
+    stringstream out;                                                          \
+    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_            \
+        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";     \
+    return out.str();                                                          \
+  }                                                                            \
+  void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {   \
+    if (out->always_false) {                                                   \
+      out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);            \
+      out->__isset.min = true;                                                 \
+      out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);            \
+      out->__isset.max = true;                                                 \
+      out->__set_always_false(false);                                          \
+    } else {                                                                   \
+      out->min.__set_##THRIFT_TYPE##_val(                                      \
+          std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val));     \
+      out->max.__set_##THRIFT_TYPE##_val(                                      \
+          std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val));     \
+    }                                                                          \
+  }                                                                            \
+  void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
+    out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);              \
+    out->__isset.min = true;                                                   \
+    out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);              \
+    out->__isset.max = true;                                                   \
+  }
+
+NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
+NUMERIC_MIN_MAX_FILTER_FUNCS(TinyInt, int8_t, byte, TINYINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(SmallInt, int16_t, short, SMALLINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Int, int32_t, int, INT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(BigInt, int64_t, long, BIGINT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Float, float, double, FLOAT);
+NUMERIC_MIN_MAX_FILTER_FUNCS(Double, double, double, DOUBLE);
+
+int64_t GetIntTypeMax(const ColumnType& type) {
+  switch (type.type) {
+    case TYPE_TINYINT:
+      return numeric_limits<int8_t>::max();
+    case TYPE_SMALLINT:
+      return numeric_limits<int16_t>::max();
+    case TYPE_INT:
+      return numeric_limits<int32_t>::max();
+    case TYPE_BIGINT:
+      return numeric_limits<int64_t>::max();
+    default:
+      DCHECK(false) << "Not an int type: " << type;
+  }
+  return -1;
+}
+
+int64_t GetIntTypeMin(const ColumnType& type) {
+  switch (type.type) {
+    case TYPE_TINYINT:
+      return numeric_limits<int8_t>::lowest();
+    case TYPE_SMALLINT:
+      return numeric_limits<int16_t>::lowest();
+    case TYPE_INT:
+      return numeric_limits<int32_t>::lowest();
+    case TYPE_BIGINT:
+      return numeric_limits<int64_t>::lowest();
+    default:
+      DCHECK(false) << "Not an int type: " << type;
+  }
+  return -1;
+}
+
+#define NUMERIC_MIN_MAX_FILTER_CAST(NAME)                           \
+  bool NAME##MinMaxFilter::GetCastIntMinMax(                        \
+      const ColumnType& type, int64_t* out_min, int64_t* out_max) { \
+    int64_t type_min = GetIntTypeMin(type);                         \
+    int64_t type_max = GetIntTypeMax(type);                         \
+    if (min_ < type_min) {                                          \
+      *out_min = type_min;                                          \
+    } else if (min_ > type_max) {                                   \
+      return false;                                                 \
+    } else {                                                        \
+      *out_min = min_;                                              \
+    }                                                               \
+    if (max_ > type_max) {                                          \
+      *out_max = type_max;                                          \
+    } else if (max_ < type_min) {                                   \
+      return false;                                                 \
+    } else {                                                        \
+      *out_max = max_;                                              \
+    }                                                               \
+    return true;                                                    \
+  }
+
+NUMERIC_MIN_MAX_FILTER_CAST(TinyInt);
+NUMERIC_MIN_MAX_FILTER_CAST(SmallInt);
+NUMERIC_MIN_MAX_FILTER_CAST(Int);
+NUMERIC_MIN_MAX_FILTER_CAST(BigInt);
+
+#define NUMERIC_MIN_MAX_FILTER_NO_CAST(NAME)                                           \
+  bool NAME##MinMaxFilter::GetCastIntMinMax(                                           \
+      const ColumnType& type, int64_t* out_min, int64_t* out_max) {                    \
+    DCHECK(false) << "Casting min-max filters of type " << #NAME << " not supported."; \
+    return true;                                                                       \
+  }
+
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Bool);
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Float);
+NUMERIC_MIN_MAX_FILTER_NO_CAST(Double);
+
+// STRING
+const char* StringMinMaxFilter::LLVM_CLASS_NAME = "class.impala::StringMinMaxFilter";
+const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
+
+StringMinMaxFilter::StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool)
+  : min_buffer_(mem_pool), max_buffer_(mem_pool) {
+  always_false_ = thrift.always_false;
+  always_true_ = thrift.always_true;
+  if (!always_true_ && !always_false_) {
+    DCHECK(thrift.__isset.min);
+    DCHECK(thrift.__isset.max);
+    DCHECK(thrift.min.__isset.string_val);
+    DCHECK(thrift.max.__isset.string_val);
+    min_ = StringValue(thrift.min.string_val);
+    max_ = StringValue(thrift.max.string_val);
+    CopyToBuffer(&min_buffer_, &min_, min_.len);
+    CopyToBuffer(&max_buffer_, &max_, max_.len);
+  }
+}
+
+PrimitiveType StringMinMaxFilter::type() {
+  return PrimitiveType::TYPE_STRING;
+}
+
+void StringMinMaxFilter::MaterializeValues() {
+  if (always_true_ || always_false_) return;
+  if (min_buffer_.IsEmpty()) {
+    if (min_.len > MAX_BOUND_LENGTH) {
+      // Truncating 'value' gives a valid min bound as the result will be <= 'value'.
+      CopyToBuffer(&min_buffer_, &min_, MAX_BOUND_LENGTH);
+    } else {
+      CopyToBuffer(&min_buffer_, &min_, min_.len);
+    }
+  }
+  if (max_buffer_.IsEmpty()) {
+    if (max_.len > MAX_BOUND_LENGTH) {
+      CopyToBuffer(&max_buffer_, &max_, MAX_BOUND_LENGTH);
+      if (always_true_) return;
+      // After truncating 'value', to still have a valid max bound we add 1 to one char in
+      // the string, so that the result will be > 'value'. If the entire string is already
+      // the max char, then disable this filter by making it always_true.
+      int i = MAX_BOUND_LENGTH - 1;
+      while (i >= 0 && static_cast<int32_t>(max_buffer_.buffer()[i]) == -1) {
+        max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
+        --i;
+      }
+      if (i == -1) {
+        SetAlwaysTrue();
+        return;
+      }
+      max_buffer_.buffer()[i] = max_buffer_.buffer()[i] + 1;
+    } else {
+      CopyToBuffer(&max_buffer_, &max_, max_.len);
+    }
+  }
+}
+
+void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+  if (!always_true_ && !always_false_) {
+    thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
+    thrift->min.__isset.string_val = true;
+    thrift->__isset.min = true;
+    thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
+    thrift->max.__isset.string_val = true;
+    thrift->__isset.max = true;
+  }
+  thrift->__set_always_false(always_false_);
+  thrift->__set_always_true(always_true_);
+}
+
+string StringMinMaxFilter::DebugString() const {
+  stringstream out;
+  out << "StringMinMaxFilter(min=" << min_ << ", max=" << max_
+      << ", always_false=" << (always_false_ ? "true" : "false")
+      << ", always_true=" << (always_true_ ? "true" : "false") << ")";
+  return out.str();
+}
+
+void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  if (out->always_false) {
+    out->min.__set_string_val(in.min.string_val);
+    out->__isset.min = true;
+    out->max.__set_string_val(in.max.string_val);
+    out->__isset.max = true;
+    out->__set_always_false(false);
+  } else {
+    StringValue in_min_val = StringValue(in.min.string_val);
+    StringValue out_min_val = StringValue(out->min.string_val);
+    if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
+    StringValue in_max_val = StringValue(in.max.string_val);
+    StringValue out_max_val = StringValue(out->max.string_val);
+    if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
+  }
+}
+
+void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->min.__set_string_val(in.min.string_val);
+  out->__isset.min = true;
+  out->max.__set_string_val(in.max.string_val);
+  out->__isset.max = true;
+}
+
+void StringMinMaxFilter::CopyToBuffer(
+    StringBuffer* buffer, StringValue* value, int64_t len) {
+  if (value->ptr == buffer->buffer()) return;
+  buffer->Clear();
+  if (!buffer->Append(value->ptr, len).ok()) {
+    // If Append() fails, for example because we're out of memory, disable the filter.
+    SetAlwaysTrue();
+    return;
+  }
+  value->ptr = buffer->buffer();
+  value->len = len;
+}
+
+void StringMinMaxFilter::SetAlwaysTrue() {
+  always_true_ = true;
+  max_buffer_.Clear();
+  min_buffer_.Clear();
+  min_.ptr = nullptr;
+  min_.len = 0;
+  max_.ptr = nullptr;
+  max_.len = 0;
+}
+
+// TIMESTAMP
+const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
+    "class.impala::TimestampMinMaxFilter";
+
+TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
+  always_false_ = thrift.always_false;
+  if (!always_false_) {
+    DCHECK(thrift.min.__isset.timestamp_val);
+    DCHECK(thrift.max.__isset.timestamp_val);
+    min_ = TimestampValue::FromTColumnValue(thrift.min);
+    max_ = TimestampValue::FromTColumnValue(thrift.max);
+  }
+}
+
+PrimitiveType TimestampMinMaxFilter::type() {
+  return PrimitiveType::TYPE_TIMESTAMP;
+}
+
+void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+  if (!always_false_) {
+    min_.ToTColumnValue(&thrift->min);
+    thrift->__isset.min = true;
+    max_.ToTColumnValue(&thrift->max);
+    thrift->__isset.max = true;
+  }
+  thrift->__set_always_false(always_false_);
+  thrift->__set_always_true(false);
+}
+
+string TimestampMinMaxFilter::DebugString() const {
+  stringstream out;
+  out << "TimestampMinMaxFilter(min=" << min_ << ", max=" << max_
+      << " always_false=" << (always_false_ ? "true" : "false") << ")";
+  return out.str();
+}
+
+void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  if (out->always_false) {
+    out->min.__set_timestamp_val(in.min.timestamp_val);
+    out->__isset.min = true;
+    out->max.__set_timestamp_val(in.max.timestamp_val);
+    out->__isset.max = true;
+    out->__set_always_false(false);
+  } else {
+    TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
+    TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
+    if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
+    TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
+    TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
+    if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
+  }
+}
+
+void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->min.__set_timestamp_val(in.min.timestamp_val);
+  out->__isset.min = true;
+  out->max.__set_timestamp_val(in.max.timestamp_val);
+  out->__isset.max = true;
+}
+
+// MinMaxFilter
+bool MinMaxFilter::GetCastIntMinMax(
+    const ColumnType& type, int64_t* out_min, int64_t* out_max) {
+  DCHECK(false) << "Casting min-max filters of type " << this->type()
+      << " not supported.";
+  return true;
+}
+
+MinMaxFilter* MinMaxFilter::Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+  switch (type.type) {
+    case PrimitiveType::TYPE_BOOLEAN:
+      return pool->Add(new BoolMinMaxFilter());
+    case PrimitiveType::TYPE_TINYINT:
+      return pool->Add(new TinyIntMinMaxFilter());
+    case PrimitiveType::TYPE_SMALLINT:
+      return pool->Add(new SmallIntMinMaxFilter());
+    case PrimitiveType::TYPE_INT:
+      return pool->Add(new IntMinMaxFilter());
+    case PrimitiveType::TYPE_BIGINT:
+      return pool->Add(new BigIntMinMaxFilter());
+    case PrimitiveType::TYPE_FLOAT:
+      return pool->Add(new FloatMinMaxFilter());
+    case PrimitiveType::TYPE_DOUBLE:
+      return pool->Add(new DoubleMinMaxFilter());
+    case PrimitiveType::TYPE_STRING:
+      return pool->Add(new StringMinMaxFilter(mem_pool));
+    case PrimitiveType::TYPE_TIMESTAMP:
+      return pool->Add(new TimestampMinMaxFilter());
+    default:
+      DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
+  }
+  return nullptr;
+}
+
+MinMaxFilter* MinMaxFilter::Create(
+    const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool) {
+  switch (type.type) {
+    case PrimitiveType::TYPE_BOOLEAN:
+      return pool->Add(new BoolMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_TINYINT:
+      return pool->Add(new TinyIntMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_SMALLINT:
+      return pool->Add(new SmallIntMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_INT:
+      return pool->Add(new IntMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_BIGINT:
+      return pool->Add(new BigIntMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_FLOAT:
+      return pool->Add(new FloatMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_DOUBLE:
+      return pool->Add(new DoubleMinMaxFilter(thrift));
+    case PrimitiveType::TYPE_STRING:
+      return pool->Add(new StringMinMaxFilter(thrift, mem_pool));
+    case PrimitiveType::TYPE_TIMESTAMP:
+      return pool->Add(new TimestampMinMaxFilter(thrift));
+    default:
+      DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
+  }
+  return nullptr;
+}
+
+void MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  if (in.always_false || out->always_true) return;
+  if (in.always_true) {
+    out->__set_always_true(true);
+    return;
+  }
+  if (in.min.__isset.bool_val) {
+    DCHECK(out->min.__isset.bool_val);
+    BoolMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.byte_val) {
+    DCHECK(out->min.__isset.byte_val);
+    TinyIntMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.short_val) {
+    DCHECK(out->min.__isset.short_val);
+    SmallIntMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.int_val) {
+    DCHECK(out->min.__isset.int_val);
+    IntMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.long_val) {
+    DCHECK(out->min.__isset.long_val);
+    BigIntMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.double_val) {
+    // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
+    DCHECK(out->min.__isset.double_val);
+    DoubleMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.string_val) {
+    DCHECK(out->min.__isset.string_val);
+    StringMinMaxFilter::Or(in, out);
+    return;
+  } else if (in.min.__isset.timestamp_val) {
+    DCHECK(out->min.__isset.timestamp_val);
+    TimestampMinMaxFilter::Or(in, out);
+    return;
+  }
+  DCHECK(false) << "Unsupported MinMaxFilter type.";
+}
+
+void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
+  out->__set_always_false(in.always_false);
+  out->__set_always_true(in.always_true);
+  if (in.always_false || in.always_true) return;
+  if (in.min.__isset.bool_val) {
+    DCHECK(!out->min.__isset.bool_val);
+    BoolMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.byte_val) {
+    DCHECK(!out->min.__isset.byte_val);
+    TinyIntMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.short_val) {
+    DCHECK(!out->min.__isset.short_val);
+    SmallIntMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.int_val) {
+    DCHECK(!out->min.__isset.int_val);
+    IntMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.long_val) {
+    // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
+    DCHECK(!out->min.__isset.long_val);
+    BigIntMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.double_val) {
+    // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
+    DCHECK(!out->min.__isset.double_val);
+    DoubleMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.string_val) {
+    DCHECK(!out->min.__isset.string_val);
+    StringMinMaxFilter::Copy(in, out);
+    return;
+  } else if (in.min.__isset.timestamp_val) {
+    DCHECK(!out->min.__isset.timestamp_val);
+    TimestampMinMaxFilter::Copy(in, out);
+    return;
+  }
+  DCHECK(false) << "Unsupported MinMaxFilter type.";
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/be/src/util/min-max-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
new file mode 100644
index 0000000..556f5fa
--- /dev/null
+++ b/be/src/util/min-max-filter.h
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_MIN_MAX_FILTER_H
+#define IMPALA_UTIL_MIN_MAX_FILTER_H
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "impala-ir/impala-ir-functions.h"
+#include "runtime/string-buffer.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+
+namespace impala {
+
+class MemPool;
+class ObjectPool;
+
+/// A MinMaxFilter tracks the min and max currently seen values in a data set for use in
+/// runtime filters.
+///
+/// Filters are constructed using MinMaxFilter::Create() which returns a MinMaxFilter of
+/// the appropriate type. Values can then be added using Insert(), and the min and max can
+/// be retrieved using GetMin()/GetMax().
+///
+/// MinMaxFilters ignore NULL values, and so are only appropriate to use as a runtime
+/// filter if the join predicate is '=' and not 'is not distinct from'.
+class MinMaxFilter {
+ public:
+  virtual ~MinMaxFilter() {}
+
+  /// Returns the min/max values in the tuple slot representation. It is not valid to call
+  /// these functions if AlwaysFalse() returns true.
+  virtual void* GetMin() = 0;
+  virtual void* GetMax() = 0;
+
+  /// Returns the min/max values in the out paramsters 'out_min'/'out_max', converted to
+  /// fit into 'type', eg. if the calculated max value is greater than the max value for
+  /// 'type', the returned max is the max for 'type'. Returns false if the entire range
+  /// from the calculated min to max is outside the range for 'type'. May only be called
+  /// for integer-typed filters.
+  virtual bool GetCastIntMinMax(
+      const ColumnType& type, int64_t* out_min, int64_t* out_max);
+
+  virtual PrimitiveType type() = 0;
+
+  /// Add a new value, updating the current min/max.
+  virtual void Insert(void* val) = 0;
+
+  /// If true, this filter allows all rows to pass.
+  virtual bool AlwaysTrue() const = 0;
+
+  /// If true, this filter doesn't allow any rows to pass.
+  virtual bool AlwaysFalse() const = 0;
+
+  /// Materialize filter values by copying any values stored by filters into memory owned
+  /// by the filter. Filters may assume that the memory for Insert()-ed values stays valid
+  /// until this is called.
+  virtual void MaterializeValues() {}
+
+  /// Convert this filter to a thrift representation.
+  virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
+
+  virtual std::string DebugString() const = 0;
+
+  /// Returns a new MinMaxFilter with the given type, allocated from 'pool'.
+  static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+
+  /// Returns a new MinMaxFilter created from the thrift representation, allocated from
+  /// 'pool'.
+  static MinMaxFilter* Create(
+      const TMinMaxFilter& thrift, ColumnType type, ObjectPool* pool, MemPool* mem_pool);
+
+  /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+  /// Copies the contents of 'in' into 'out'.
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+  /// Returns the LLVM_CLASS_NAME for the given type.
+  static std::string GetLlvmClassName(PrimitiveType type);
+
+  /// Returns the IRFunction::Type for Insert() for the given type.
+  static IRFunction::Type GetInsertIRFunctionType(PrimitiveType type);
+};
+
+#define NUMERIC_MIN_MAX_FILTER(NAME, TYPE)                                    \
+  class NAME##MinMaxFilter : public MinMaxFilter {                            \
+   public:                                                                    \
+    NAME##MinMaxFilter() {                                                    \
+      min_ = std::numeric_limits<TYPE>::max();                                \
+      max_ = std::numeric_limits<TYPE>::lowest();                             \
+    }                                                                         \
+    NAME##MinMaxFilter(const TMinMaxFilter& thrift);                          \
+    virtual ~NAME##MinMaxFilter() {}                                          \
+    virtual void* GetMin() override { return &min_; }                         \
+    virtual void* GetMax() override { return &max_; }                         \
+    virtual bool GetCastIntMinMax(                                            \
+        const ColumnType& type, int64_t* out_min, int64_t* out_max) override; \
+    virtual PrimitiveType type() override;                                    \
+    virtual void Insert(void* val) override;                                  \
+    virtual bool AlwaysTrue() const override { return false; }                \
+    virtual bool AlwaysFalse() const override {                               \
+      return min_ == std::numeric_limits<TYPE>::max()                         \
+          && max_ == std::numeric_limits<TYPE>::lowest();                     \
+    }                                                                         \
+    virtual void ToThrift(TMinMaxFilter* thrift) const override;              \
+    virtual std::string DebugString() const override;                         \
+    static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);              \
+    static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);            \
+    static const char* LLVM_CLASS_NAME;                                       \
+                                                                              \
+   private:                                                                   \
+    TYPE min_;                                                                \
+    TYPE max_;                                                                \
+  };
+
+NUMERIC_MIN_MAX_FILTER(Bool, bool);
+NUMERIC_MIN_MAX_FILTER(TinyInt, int8_t);
+NUMERIC_MIN_MAX_FILTER(SmallInt, int16_t);
+NUMERIC_MIN_MAX_FILTER(Int, int32_t);
+NUMERIC_MIN_MAX_FILTER(BigInt, int64_t);
+NUMERIC_MIN_MAX_FILTER(Float, float);
+NUMERIC_MIN_MAX_FILTER(Double, double);
+
+class StringMinMaxFilter : public MinMaxFilter {
+ public:
+  StringMinMaxFilter(MemPool* mem_pool)
+    : min_buffer_(mem_pool),
+      max_buffer_(mem_pool),
+      always_false_(true),
+      always_true_(false) {}
+  StringMinMaxFilter(const TMinMaxFilter& thrift, MemPool* mem_pool);
+  virtual ~StringMinMaxFilter() {}
+
+  virtual void* GetMin() override { return &min_; }
+  virtual void* GetMax() override { return &max_; }
+  virtual PrimitiveType type() override;
+
+  virtual void Insert(void* val) override;
+  virtual bool AlwaysTrue() const override { return always_true_; }
+  virtual bool AlwaysFalse() const override { return always_false_; }
+
+  /// Copies the values pointed to by 'min_'/'max_' into 'min_buffer_'/'max_buffer_',
+  /// truncating them if necessary.
+  virtual void MaterializeValues() override;
+
+  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual std::string DebugString() const override;
+
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+  /// Struct name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
+
+ private:
+  /// Copies the contents of 'value' into 'buffer', up to 'len', and reassignes 'value' to
+  /// point to 'buffer'. If an oom is hit, disables the filter by setting 'always_true_'
+  /// to true.
+  void CopyToBuffer(StringBuffer* buffer, StringValue* value, int64_t len);
+
+  /// Sets 'always_true_' to true and clears the values of 'min_', 'max_', 'min_buffer_',
+  /// and 'max_buffer_'.
+  void SetAlwaysTrue();
+
+  /// The maximum length of string to store in 'min_str_' or 'max_str_'. Strings inserted
+  /// into this filter that are longer than this will be truncated.
+  static const int MAX_BOUND_LENGTH;
+
+  /// The min/max values. After a call to MaterializeValues() these will point to
+  /// 'min_buffer_'/'max_buffer_'.
+  StringValue min_;
+  StringValue max_;
+
+  /// Local buffers to copy min/max data into. If Insert() was called and 'min_'/'max_'
+  /// was updated, these will be empty until MaterializeValues() is called.
+  StringBuffer min_buffer_;
+  StringBuffer max_buffer_;
+
+  /// True if no rows have been inserted.
+  bool always_false_;
+  bool always_true_;
+};
+
+class TimestampMinMaxFilter : public MinMaxFilter {
+ public:
+  TimestampMinMaxFilter() { always_false_ = true; }
+  TimestampMinMaxFilter(const TMinMaxFilter& thrift);
+  virtual ~TimestampMinMaxFilter() {}
+
+  virtual void* GetMin() override { return &min_; }
+  virtual void* GetMax() override { return &max_; }
+  virtual PrimitiveType type() override;
+
+  virtual void Insert(void* val) override;
+  virtual bool AlwaysTrue() const override { return false; }
+  virtual bool AlwaysFalse() const override { return always_false_; }
+  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual std::string DebugString() const override;
+
+  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+
+  /// Struct name in LLVM IR.
+  static const char* LLVM_CLASS_NAME;
+
+ private:
+  TimestampValue min_;
+  TimestampValue max_;
+
+  /// True if no rows have been inserted.
+  bool always_false_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/Data.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Data.thrift b/common/thrift/Data.thrift
index 61d1988..6361e7e 100644
--- a/common/thrift/Data.thrift
+++ b/common/thrift/Data.thrift
@@ -28,6 +28,7 @@ struct TColumnValue {
   4: optional double double_val
   5: optional string string_val
   8: optional binary binary_val
+  9: optional binary timestamp_val
 }
 
 struct TResultRow {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1f261f4..1fd9c25 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -33,6 +33,7 @@ include "DataSinks.thrift"
 include "Results.thrift"
 include "RuntimeProfile.thrift"
 include "ImpalaService.thrift"
+include "Data.thrift"
 
 // constants for TQueryOptions.num_nodes
 const i32 NUM_NODES_ALL = 0
@@ -191,14 +192,14 @@ struct TQueryOptions {
   // be rounded up to the nearest power of two.
   38: optional i32 runtime_bloom_filter_size = 1048576
 
-  // Time in ms to wait until partition filters are delivered. If 0, the default defined
+  // Time in ms to wait until runtime filters are delivered. If 0, the default defined
   // by the startup flag of the same name is used.
   39: optional i32 runtime_filter_wait_time_ms = 0
 
   // If true, per-row runtime filtering is disabled
   40: optional bool disable_row_runtime_filtering = false
 
-  // Maximum number of runtime filters allowed per query
+  // Maximum number of bloom runtime filters allowed per query
   41: optional i32 max_num_runtime_filters = 10
 
   // If true, use UTF-8 annotation for string columns. Note that char and varchar columns
@@ -226,10 +227,10 @@ struct TQueryOptions {
   // the files there.
   45: optional bool s3_skip_insert_staging = true
 
-  // Minimum runtime filter size, in bytes
+  // Minimum runtime bloom filter size, in bytes
   46: optional i32 runtime_filter_min_size = 1048576
 
-  // Maximum runtime filter size, in bytes
+  // Maximum runtime bloom filter size, in bytes
   47: optional i32 runtime_filter_max_size = 16777216
 
   // Prefetching behavior during hash tables' building and probing.
@@ -771,6 +772,16 @@ struct TBloomFilter {
   4: required bool always_false
 }
 
+struct TMinMaxFilter {
+  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+  1: required bool always_true
+
+  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+  2: required bool always_false
+
+  3: optional Data.TColumnValue min
+  4: optional Data.TColumnValue max
+}
 
 // UpdateFilter
 
@@ -787,6 +798,8 @@ struct TUpdateFilterParams {
 
   // required in V1
   4: optional TBloomFilter bloom_filter
+
+  5: optional TMinMaxFilter min_max_filter
 }
 
 struct TUpdateFilterResult {
@@ -812,6 +825,8 @@ struct TPublishFilterParams {
   // Actual bloom_filter payload
   // required in V1
   5: optional TBloomFilter bloom_filter
+
+  6: optional TMinMaxFilter min_max_filter
 }
 
 struct TPublishFilterResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index b8a073e..061da00 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -199,13 +199,13 @@ enum TImpalaQueryOptions {
   // two.
   RUNTIME_BLOOM_FILTER_SIZE,
 
-  // Time (in ms) to wait in scans for partition filters to arrive.
+  // Time (in ms) to wait in scans for runtime filters to arrive.
   RUNTIME_FILTER_WAIT_TIME_MS,
 
   // If true, disable application of runtime filters to individual rows.
   DISABLE_ROW_RUNTIME_FILTERING,
 
-  // Maximum number of runtime filters allowed per query.
+  // Maximum number of bloom runtime filters allowed per query.
   MAX_NUM_RUNTIME_FILTERS,
 
   // If true, use UTF-8 annotation for string columns. Note that char and varchar columns
@@ -227,10 +227,10 @@ enum TImpalaQueryOptions {
   // TODO: Find a way to get this working for INSERT OVERWRITEs too.
   S3_SKIP_INSERT_STAGING,
 
-  // Maximum runtime filter size, in bytes.
+  // Maximum runtime bloom filter size, in bytes.
   RUNTIME_FILTER_MAX_SIZE,
 
-  // Minimum runtime filter size, in bytes.
+  // Minimum runtime bloom filter size, in bytes.
   RUNTIME_FILTER_MIN_SIZE,
 
   // Prefetching behavior during hash tables' building and probing.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c04d08a..97ef1b3 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -98,6 +98,16 @@ struct TRuntimeFilterTargetDesc {
   // Indicates if this target is on the same fragment as the join that
   // produced the runtime filter
   5: required bool is_local_target
+
+  // If the target node is a Kudu scan node, the name, in the case it appears in Kudu, and
+  // type of the targeted column.
+  6: optional string kudu_col_name
+  7: optional Types.TColumnType kudu_col_type;
+}
+
+enum TRuntimeFilterType {
+  BLOOM,
+  MIN_MAX
 }
 
 // Specification of a runtime filter.
@@ -132,6 +142,9 @@ struct TRuntimeFilterDesc {
   // The estimated number of distinct values that the planner expects the filter to hold.
   // Used to compute the size of the filter.
   9: optional i64 ndv_estimate
+
+  // The type of runtime filter to build.
+  10: required TRuntimeFilterType type
 }
 
 // The information contained in subclasses of ScanNode captured in two separate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 917d918..d04a15e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -189,7 +189,7 @@ public class HashJoinNode extends JoinNode {
       }
       if (!runtimeFilters_.isEmpty()) {
         output.append(detailPrefix + "runtime filters: ");
-        output.append(getRuntimeFilterExplainString(true));
+        output.append(getRuntimeFilterExplainString(true, detailLevel));
       }
     }
     return output.toString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ad8501a..44f58eb 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -978,7 +978,7 @@ public class HdfsScanNode extends ScanNode {
       }
       if (!runtimeFilters_.isEmpty()) {
         output.append(detailPrefix + "runtime filters: ");
-        output.append(getRuntimeFilterExplainString(false));
+        output.append(getRuntimeFilterExplainString(false, detailLevel));
       }
     }
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index cbc132b..390592e 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -295,6 +295,10 @@ public class KuduScanNode extends ScanNode {
           result.append(detailPrefix + "kudu predicates: " + getExplainString(
               kuduConjuncts_) + "\n");
         }
+        if (!runtimeFilters_.isEmpty()) {
+          result.append(detailPrefix + "runtime filters: ");
+          result.append(getRuntimeFilterExplainString(false, detailLevel));
+        }
       }
     }
     return result.toString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index c149b5c..a14c89a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -705,25 +705,28 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
 
   protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; }
 
-  protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+  protected String getRuntimeFilterExplainString(
+      boolean isBuildNode, TExplainLevel detailLevel) {
     if (runtimeFilters_.isEmpty()) return "";
-    final String applyNodeFilterFormat = "%s -> %s";
-    final String buildNodeFilterFormat = "%s <- %s";
-    String format = isBuildNode ? buildNodeFilterFormat : applyNodeFilterFormat;
-    StringBuilder output = new StringBuilder();
     List<String> filtersStr = Lists.newArrayList();
     for (RuntimeFilter filter: runtimeFilters_) {
-      Expr expr = null;
+      StringBuilder filterStr = new StringBuilder();
+      filterStr.append(filter.getFilterId());
+      if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+        filterStr.append("[");
+        filterStr.append(filter.getType().toString().toLowerCase());
+        filterStr.append("]");
+      }
       if (isBuildNode) {
-        expr = filter.getSrcExpr();
+        filterStr.append(" <- ");
+        filterStr.append(filter.getSrcExpr().toSql());
       } else {
-        expr = filter.getTargetExpr(getId());
+        filterStr.append(" -> ");
+        filterStr.append(filter.getTargetExpr(getId()).toSql());
       }
-      Preconditions.checkNotNull(expr);
-      filtersStr.add(String.format(format, filter.getFilterId(), expr.toSql()));
+      filtersStr.add(filterStr.toString());
     }
-    output.append(Joiner.on(", ").join(filtersStr) + "\n");
-    return output.toString();
+    return Joiner.on(", ").join(filtersStr) + "\n";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 98365e6..758e79d 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -26,6 +26,8 @@ import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BinaryPredicate.Operator;
+import org.apache.impala.analysis.CastExpr;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.Predicate;
@@ -35,6 +37,7 @@ import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -44,6 +47,7 @@ import org.apache.impala.planner.PlanNode;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
 import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
+import org.apache.impala.thrift.TRuntimeFilterType;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -107,6 +111,8 @@ public final class RuntimeFilterGenerator {
     private final Expr srcExpr_;
     // Expr (lhs of join predicate) from which the targetExprs_ are generated.
     private final Expr origTargetExpr_;
+    // The operator comparing 'srcExpr_' and 'origTargetExpr_'.
+    private final Operator exprCmpOp_;
     // Runtime filter targets
     private final List<RuntimeFilterTarget> targets_ = Lists.newArrayList();
     // Slots from base table tuples that have value transfer from the slots
@@ -131,6 +137,8 @@ public final class RuntimeFilterGenerator {
     // If set, indicates that the filter can't be assigned to another scan node.
     // Once set, it can't be unset.
     private boolean finalized_ = false;
+    // The type of filter to build.
+    private TRuntimeFilterType type_;
 
     /**
      * Internal representation of a runtime filter target.
@@ -165,6 +173,14 @@ public final class RuntimeFilterGenerator {
         tFilterTarget.setTarget_expr_slotids(tSlotIds);
         tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
         tFilterTarget.setIs_local_target(isLocalTarget);
+        if (node instanceof KuduScanNode) {
+          // assignRuntimeFilters() only assigns KuduScanNode targets if the target expr
+          // is a slot ref, possibly with an implicit cast, pointing to a column.
+          SlotRef slotRef = expr.unwrapSlotRef(true);
+          KuduColumn col = (KuduColumn) slotRef.getDesc().getColumn();
+          tFilterTarget.setKudu_col_name(col.getKuduName());
+          tFilterTarget.setKudu_col_type(col.getType().toThrift());
+        }
         return tFilterTarget;
       }
 
@@ -179,13 +195,16 @@ public final class RuntimeFilterGenerator {
       }
     }
 
-    private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode,
-        Expr srcExpr, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots) {
+    private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
+        Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
+        TRuntimeFilterType type) {
       id_ = filterId;
       src_ = filterSrcNode;
       srcExpr_ = srcExpr;
       origTargetExpr_ = origTargetExpr;
+      exprCmpOp_ = exprCmpOp;
       targetSlotsByTid_ = targetSlots;
+      type_ = type;
       computeNdvEstimate();
     }
 
@@ -221,6 +240,7 @@ public final class RuntimeFilterGenerator {
             appliedOnPartitionColumns && target.isBoundByPartitionColumns;
       }
       tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
+      tFilter.setType(type_);
       return tFilter;
     }
 
@@ -230,7 +250,8 @@ public final class RuntimeFilterGenerator {
      * or null if a runtime filter cannot be generated from the specified predicate.
      */
     public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
-        Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode) {
+        Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
+        TRuntimeFilterType type) {
       Preconditions.checkNotNull(idGen);
       Preconditions.checkNotNull(joinPredicate);
       Preconditions.checkNotNull(filterSrcNode);
@@ -256,8 +277,8 @@ public final class RuntimeFilterGenerator {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Generating runtime filter from predicate " + joinPredicate);
       }
-      return new RuntimeFilter(idGen.getNextId(), filterSrcNode,
-          srcExpr, targetExpr, targetSlots);
+      return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
+          normalizedJoinConjunct.getOp(), targetSlots, type);
     }
 
     /**
@@ -337,6 +358,8 @@ public final class RuntimeFilterGenerator {
     public Expr getOrigTargetExpr() { return origTargetExpr_; }
     public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid_; }
     public RuntimeFilterId getFilterId() { return id_; }
+    public TRuntimeFilterType getType() { return type_; }
+    public Operator getExprCompOp() { return exprCmpOp_; }
 
     /**
      * Estimates the selectivity of a runtime filter as the cardinality of the
@@ -394,14 +417,14 @@ public final class RuntimeFilterGenerator {
   public static void generateRuntimeFilters(PlannerContext ctx, PlanNode plan) {
     Preconditions.checkNotNull(ctx);
     Preconditions.checkNotNull(ctx.getQueryOptions());
-    int maxNumFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
-    Preconditions.checkState(maxNumFilters >= 0);
+    int maxNumBloomFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
+    Preconditions.checkState(maxNumBloomFilters >= 0);
     RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator();
     filterGenerator.generateFilters(ctx, plan);
     List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
-    if (filters.size() > maxNumFilters) {
-      // If more than 'maxNumFilters' were generated, sort them by increasing selectivity
-      // and keep the 'maxNumFilters' most selective.
+    if (filters.size() > maxNumBloomFilters) {
+      // If more than 'maxNumBloomFilters' were generated, sort them by increasing
+      // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters.
       Collections.sort(filters, new Comparator<RuntimeFilter>() {
           public int compare(RuntimeFilter a, RuntimeFilter b) {
             double aSelectivity =
@@ -413,8 +436,14 @@ public final class RuntimeFilterGenerator {
         }
       );
     }
-    for (RuntimeFilter filter:
-         filters.subList(0, Math.min(filters.size(), maxNumFilters))) {
+    // We only enforce a limit on the number of bloom filters as they are much more
+    // heavy-weight than the other filter types.
+    int numBloomFilters = 0;
+    for (RuntimeFilter filter : filters) {
+      if (filter.getType() == TRuntimeFilterType.BLOOM) {
+        if (numBloomFilters >= maxNumBloomFilters) continue;
+        ++numBloomFilters;
+      }
       filter.setIsBroadcast(
           filter.src_.getDistributionMode() == DistributionMode.BROADCAST);
       filter.computeHasLocalTargets();
@@ -462,12 +491,14 @@ public final class RuntimeFilterGenerator {
       }
       joinConjuncts.addAll(joinNode.getConjuncts());
       List<RuntimeFilter> filters = Lists.newArrayList();
-      for (Expr conjunct: joinConjuncts) {
-        RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
-            ctx.getRootAnalyzer(), conjunct, joinNode);
-        if (filter == null) continue;
-        registerRuntimeFilter(filter);
-        filters.add(filter);
+      for (TRuntimeFilterType type : TRuntimeFilterType.values()) {
+        for (Expr conjunct : joinConjuncts) {
+          RuntimeFilter filter = RuntimeFilter.create(
+              filterIdGenerator, ctx.getRootAnalyzer(), conjunct, joinNode, type);
+          if (filter == null) continue;
+          registerRuntimeFilter(filter);
+          filters.add(filter);
+        }
       }
       generateFilters(ctx, root.getChild(0));
       // Finalize every runtime filter of that join. This is to ensure that we don't
@@ -538,11 +569,14 @@ public final class RuntimeFilterGenerator {
    * 2. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned
    *    to 'scanNode' if the filter is produced within the same fragment that contains the
    *    scan node.
+   * 3. Only Hdfs and Kudu scan nodes are supported:
+   *     a. If the target is an HdfsScanNode, the filter must be type BLOOM.
+   *     b. If the target is a KuduScanNode, the filter must be type MIN_MAX, the target
+   *         must be a slot ref on a column, and the comp op cannot be 'not distinct'.
    * A scan node may be used as a destination node for multiple runtime filters.
-   * Currently, runtime filters can only be assigned to HdfsScanNodes.
    */
   private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
-    if (!(scanNode instanceof HdfsScanNode)) return;
+    if (!(scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode)) return;
     TupleId tid = scanNode.getTupleIds().get(0);
     if (!runtimeFiltersByTid_.containsKey(tid)) return;
     Analyzer analyzer = ctx.getRootAnalyzer();
@@ -558,6 +592,26 @@ public final class RuntimeFilterGenerator {
       if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
       boolean isLocalTarget = isLocalTarget(filter, scanNode);
       if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;
+
+      // Check that the scan node supports applying filters of this type and targetExpr.
+      if (scanNode instanceof HdfsScanNode
+          && filter.getType() != TRuntimeFilterType.BLOOM) {
+        continue;
+      } else if (scanNode instanceof KuduScanNode) {
+        if (filter.getType() != TRuntimeFilterType.MIN_MAX) continue;
+        SlotRef slotRef = targetExpr.unwrapSlotRef(true);
+        // Kudu only supports targeting a single column, not general exprs, so the target
+        // must be a SlotRef pointing to a column. We can allow implicit integer casts
+        // by casting the min/max values before sending them to Kudu.
+        // Kudu also cannot currently return nulls if a filter is applied, so it does not
+        // work with "is not distinct".
+        if (slotRef == null || slotRef.getDesc().getColumn() == null
+            || (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
+            || filter.getExprCompOp() == Operator.NOT_DISTINCT) {
+          continue;
+        }
+      }
+
       RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
           scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget);
       filter.addTarget(target);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index f49e39c..760334d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -497,4 +497,11 @@ public class PlannerTest extends PlannerTestBase {
     requestWithDisableSpillOn = frontend_.createExecRequest(queryCtx, explainBuilder);
     Assert.assertNotNull(requestWithDisableSpillOn);
   }
+
+  @Test
+  public void testMinMaxRuntimeFilters() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("min-max-runtime-filters", options);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 15db74d..cf5a78b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -1245,7 +1245,7 @@ PLAN-ROOT SINK
 |
 03:HASH JOIN [INNER JOIN, PARTITIONED]
 |  hash predicates: l_orderkey = o_orderkey, l_returnflag = o_clerk
-|  runtime filters: RF002 <- o_orderkey, RF003 <- o_clerk
+|  runtime filters: RF004 <- o_orderkey, RF005 <- o_clerk
 |
 |--07:EXCHANGE [HASH(o_orderkey,o_clerk)]
 |  |
@@ -1257,7 +1257,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.92MB
-   runtime filters: RF002 -> l_orderkey, RF003 -> l_returnflag
+   runtime filters: RF004 -> l_orderkey, RF005 -> l_returnflag
 ====
 # IMPALA-4263: Grouping agg needs a merge step because the grouping exprs reference a
 # tuple that is made nullable in the join fragment.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
index 9dc9f22..5571b21 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
@@ -12,7 +12,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_customer_sk = c_customer_sk
 |  fk/pk conjuncts: ss_customer_sk = c_customer_sk
-|  runtime filters: RF000 <- c_customer_sk
+|  runtime filters: RF000[bloom] <- c_customer_sk
 |  mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB
 |  tuple-ids=0,1 row-size=355B cardinality=529700
 |
@@ -28,7 +28,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_customer_sk
+   runtime filters: RF000[bloom] -> ss_customer_sk
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -87,7 +87,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [RIGHT OUTER JOIN]
 |  hash predicates: ss_customer_sk = c_customer_sk
 |  fk/pk conjuncts: ss_customer_sk = c_customer_sk
-|  runtime filters: RF000 <- c_customer_sk
+|  runtime filters: RF000[bloom] <- c_customer_sk
 |  mem-estimate=8.50MB mem-reservation=8.50MB spill-buffer=512.00KB
 |  tuple-ids=0N,1 row-size=355B cardinality=529700
 |
@@ -103,7 +103,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_customer_sk
+   runtime filters: RF000[bloom] -> ss_customer_sk
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -124,7 +124,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
 |  fk/pk conjuncts: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
-|  runtime filters: RF000 <- sr_item_sk, RF001 <- sr_ticket_number
+|  runtime filters: RF000[bloom] <- sr_item_sk, RF001[bloom] <- sr_ticket_number
 |  mem-estimate=4.75MB mem-reservation=4.75MB spill-buffer=256.00KB
 |  tuple-ids=0,1 row-size=188B cardinality=211838
 |
@@ -140,7 +140,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_item_sk, RF001 -> ss_ticket_number
+   runtime filters: RF000[bloom] -> ss_item_sk, RF001[bloom] -> ss_ticket_number
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -160,7 +160,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_sold_time_sk = ws_sold_time_sk
 |  fk/pk conjuncts: none
-|  runtime filters: RF000 <- ws_sold_time_sk
+|  runtime filters: RF000[bloom] <- ws_sold_time_sk
 |  mem-estimate=108.67MB mem-reservation=34.00MB spill-buffer=2.00MB
 |  tuple-ids=0,1 row-size=244B cardinality=44136418
 |
@@ -174,7 +174,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_sold_time_sk
+   runtime filters: RF000[bloom] -> ss_sold_time_sk
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -195,7 +195,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: b.d_date_sk = a.d_date_sk
 |  fk/pk conjuncts: b.d_date_sk = a.d_date_sk
-|  runtime filters: RF000 <- a.d_date_sk
+|  runtime filters: RF000[bloom] <- a.d_date_sk
 |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB
 |  tuple-ids=1,0 row-size=606B cardinality=36525
 |
@@ -211,7 +211,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [tpcds.date_dim b]
    partitions=1/1 files=1 size=9.84MB
-   runtime filters: RF000 -> b.d_date_sk
+   runtime filters: RF000[bloom] -> b.d_date_sk
    stats-rows=73049 extrapolated-rows=disabled
    table stats: rows=73049 size=9.84MB
    column stats: all
@@ -236,7 +236,7 @@ PLAN-ROOT SINK
 08:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_addr_sk = c_current_addr_sk
 |  fk/pk conjuncts: none
-|  runtime filters: RF000 <- c_current_addr_sk
+|  runtime filters: RF000[bloom] <- c_current_addr_sk
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3,4,2 row-size=60B cardinality=19358
 |
@@ -251,7 +251,7 @@ PLAN-ROOT SINK
 07:HASH JOIN [INNER JOIN]
 |  hash predicates: sr_returned_date_sk = d2.d_date_sk
 |  fk/pk conjuncts: sr_returned_date_sk = d2.d_date_sk
-|  runtime filters: RF001 <- d2.d_date_sk
+|  runtime filters: RF002[bloom] <- d2.d_date_sk
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3,4 row-size=56B cardinality=8131
 |
@@ -266,14 +266,14 @@ PLAN-ROOT SINK
 06:HASH JOIN [INNER JOIN]
 |  hash predicates: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
 |  fk/pk conjuncts: sr_item_sk = ss_item_sk, sr_ticket_number = ss_ticket_number
-|  runtime filters: RF002 <- ss_item_sk, RF003 <- ss_ticket_number
+|  runtime filters: RF004[bloom] <- ss_item_sk, RF005[bloom] <- ss_ticket_number
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=1,0,3 row-size=52B cardinality=8131
 |
 |--05:HASH JOIN [INNER JOIN]
 |  |  hash predicates: ss_sold_date_sk = d1.d_date_sk
 |  |  fk/pk conjuncts: ss_sold_date_sk = d1.d_date_sk
-|  |  runtime filters: RF004 <- d1.d_date_sk
+|  |  runtime filters: RF008[bloom] <- d1.d_date_sk
 |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  |  tuple-ids=0,3 row-size=32B cardinality=11055
 |  |
@@ -289,7 +289,7 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [tpcds.store_sales]
 |     partitions=1824/1824 files=1824 size=326.32MB
-|     runtime filters: RF000 -> ss_addr_sk, RF004 -> ss_sold_date_sk
+|     runtime filters: RF000[bloom] -> ss_addr_sk, RF008[bloom] -> ss_sold_date_sk
 |     stats-rows=2880404 extrapolated-rows=disabled
 |     table stats: rows=2880404 size=326.32MB
 |     column stats: all
@@ -298,7 +298,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [tpcds.store_returns]
    partitions=1/1 files=1 size=31.19MB
-   runtime filters: RF001 -> sr_returned_date_sk, RF002 -> sr_item_sk, RF003 -> sr_ticket_number
+   runtime filters: RF002[bloom] -> sr_returned_date_sk, RF004[bloom] -> sr_item_sk, RF005[bloom] -> sr_ticket_number
    stats-rows=287514 extrapolated-rows=disabled
    table stats: rows=287514 size=31.19MB
    column stats: all
@@ -318,7 +318,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_customer_sk % 10 = c_customer_sk / 100
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF000 <- c_customer_sk / 100
+|  runtime filters: RF000[bloom] <- c_customer_sk / 100
 |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB
 |  tuple-ids=0,1 row-size=355B cardinality=2880404
 |
@@ -332,7 +332,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_customer_sk % 10
+   runtime filters: RF000[bloom] -> ss_customer_sk % 10
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -353,7 +353,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_customer_sk = c_customer_sk
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF000 <- c_customer_sk
+|  runtime filters: RF000[bloom] <- c_customer_sk
 |  mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB
 |  tuple-ids=0,1 row-size=8B cardinality=2880404
 |
@@ -367,7 +367,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_customer_sk
+   runtime filters: RF000[bloom] -> ss_customer_sk
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all
@@ -387,7 +387,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_customer_sk = c_customer_sk
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF000 <- c_customer_sk
+|  runtime filters: RF000[bloom] <- c_customer_sk
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=0,1 row-size=8B cardinality=unavailable
 |
@@ -401,7 +401,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds_seq_snap.store_sales]
    partitions=1824/1824 files=1824 size=207.85MB
-   runtime filters: RF000 -> ss_customer_sk
+   runtime filters: RF000[bloom] -> ss_customer_sk
    stats-rows=unavailable extrapolated-rows=disabled
    table stats: rows=unavailable size=unavailable
    column stats: unavailable
@@ -423,7 +423,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: ss_sold_time_sk = ws_sold_time_sk
 |  fk/pk conjuncts: none
-|  runtime filters: RF000 <- ws_sold_time_sk
+|  runtime filters: RF000[bloom] <- ws_sold_time_sk
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB
 |  tuple-ids=0,2 row-size=104B cardinality=2440073
 |
@@ -442,7 +442,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [tpcds.store_sales]
    partitions=1824/1824 files=1824 size=326.32MB
-   runtime filters: RF000 -> ss_sold_time_sk
+   runtime filters: RF000[bloom] -> ss_sold_time_sk
    stats-rows=2880404 extrapolated-rows=disabled
    table stats: rows=2880404 size=326.32MB
    column stats: all

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
index e5aa5e8..184fd1c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
@@ -193,14 +193,14 @@ PLAN-ROOT SINK
 |  |
 |  |--04:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: b.id = c.id
-|  |  |  runtime filters: RF001 <- c.id
+|  |  |  runtime filters: RF002 <- c.id
 |  |  |
 |  |  |--02:SCAN HDFS [functional.alltypestiny c]
 |  |  |     partitions=4/4 files=4 size=460B
 |  |  |
 |  |  01:SCAN HDFS [functional.alltypes b]
 |  |     partitions=24/24 files=24 size=478.45KB
-|  |     runtime filters: RF001 -> b.id
+|  |     runtime filters: RF002 -> b.id
 |  |
 |  00:SCAN HDFS [functional.alltypestiny a]
 |     partitions=4/4 files=4 size=460B

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2510fe0a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
index 556ba65..c75c02d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
@@ -391,7 +391,7 @@ PLAN-ROOT SINK
 |
 |--03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.id = functional.alltypestiny.id
-|  |  runtime filters: RF001 <- functional.alltypestiny.id
+|  |  runtime filters: RF002 <- functional.alltypestiny.id
 |  |  limit: 10
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
@@ -399,7 +399,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional.alltypessmall a]
 |     partitions=4/4 files=4 size=6.32KB
-|     runtime filters: RF001 -> a.id
+|     runtime filters: RF002 -> a.id
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -426,7 +426,7 @@ PLAN-ROOT SINK
 |  |
 |  03:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  hash predicates: a.id = functional.alltypestiny.id
-|  |  runtime filters: RF001 <- functional.alltypestiny.id
+|  |  runtime filters: RF002 <- functional.alltypestiny.id
 |  |  limit: 10
 |  |
 |  |--06:EXCHANGE [BROADCAST]
@@ -436,7 +436,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional.alltypessmall a]
 |     partitions=4/4 files=4 size=6.32KB
-|     runtime filters: RF001 -> a.id
+|     runtime filters: RF002 -> a.id
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -470,14 +470,14 @@ PLAN-ROOT SINK
 |  |
 |  03:HASH JOIN [INNER JOIN]
 |  |  hash predicates: a.id = functional.alltypestiny.id
-|  |  runtime filters: RF001 <- functional.alltypestiny.id
+|  |  runtime filters: RF002 <- functional.alltypestiny.id
 |  |
 |  |--02:SCAN HDFS [functional.alltypestiny]
 |  |     partitions=4/4 files=4 size=460B
 |  |
 |  01:SCAN HDFS [functional.alltypessmall a]
 |     partitions=4/4 files=4 size=6.32KB
-|     runtime filters: RF001 -> a.id
+|     runtime filters: RF002 -> a.id
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -508,7 +508,7 @@ PLAN-ROOT SINK
 |  |
 |  03:HASH JOIN [INNER JOIN, BROADCAST]
 |  |  hash predicates: a.id = functional.alltypestiny.id
-|  |  runtime filters: RF001 <- functional.alltypestiny.id
+|  |  runtime filters: RF002 <- functional.alltypestiny.id
 |  |
 |  |--07:EXCHANGE [BROADCAST]
 |  |  |
@@ -517,7 +517,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional.alltypessmall a]
 |     partitions=4/4 files=4 size=6.32KB
-|     runtime filters: RF001 -> a.id
+|     runtime filters: RF002 -> a.id
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB