You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/13 06:09:34 UTC

[03/10] incubator-impala git commit: IMPALA-3480: Add query options for min/max filter sizes

IMPALA-3480: Add query options for min/max filter sizes

This patch adds two query options for runtime filters:

  RUNTIME_FILTER_MAX_SIZE
  RUNTIME_FILTER_MIN_SIZE

These options define the minimum and maximum filter sizes for a filter,
no matter what the estimates produced by the planner are. Filter sizes
are rounded up to the nearest power of two.

Change-Id: I5c13c200a0f1855f38a5da50ca34a737e741868b
Reviewed-on: http://gerrit.cloudera.org:8080/2966
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/df1412c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/df1412c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/df1412c9

Branch: refs/heads/master
Commit: df1412c962945fe6e69591e80354fad692413ba3
Parents: 14cdb04
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu May 5 10:00:29 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:35 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc                  |  6 +--
 be/src/exec/hash-table.h                        |  2 +-
 be/src/exec/nested-loop-join-node.cc            | 12 ++---
 be/src/exec/partitioned-aggregation-node.cc     |  2 +-
 be/src/runtime/runtime-filter.cc                | 34 ++++++++----
 be/src/runtime/runtime-filter.h                 | 14 +++--
 be/src/service/query-options-test.cc            | 56 +++++++++++---------
 be/src/service/query-options.cc                 | 19 +++++--
 be/src/service/query-options.h                  |  6 ++-
 be/src/util/bit-util.h                          |  7 ++-
 be/src/util/debug-util.cc                       |  5 +-
 be/src/util/debug-util.h                        |  1 +
 be/src/util/fixed-size-hash-table.h             |  4 +-
 be/src/util/parse-util-test.cc                  | 12 ++++-
 be/src/util/parse-util.cc                       |  6 +++
 be/src/util/parse-util.h                        |  1 +
 common/thrift/ImpalaInternalService.thrift      |  6 +++
 common/thrift/ImpalaService.thrift              |  6 +++
 .../queries/QueryTest/runtime_filters.test      | 46 ++++++++++++++++
 .../queries/QueryTest/runtime_filters_wait.test |  1 +
 20 files changed, 183 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index bef3f1e..25cd4f1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -189,7 +189,7 @@ class HashTableTest : public testing::Test {
         &tracker_, runtime_state_, &client).ok());
 
     // Initial_num_buckets must be a power of two.
-    EXPECT_EQ(initial_num_buckets, BitUtil::NextPowerOfTwo(initial_num_buckets));
+    EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
     int64_t max_num_buckets = 1L << 31;
     table->reset(new HashTable(quadratic, runtime_state_, client, 1, NULL,
           max_num_buckets, initial_num_buckets));
@@ -354,12 +354,12 @@ class HashTableTest : public testing::Test {
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
 
     // Resize and try again.
-    int target_size = BitUtil::NextPowerOfTwo(2 * total_rows);
+    int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
     ResizeTable(hash_table.get(), target_size, &ht_ctx);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
 
-    target_size = BitUtil::NextPowerOfTwo(total_rows + 1);
+    target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
     ResizeTable(hash_table.get(), target_size, &ht_ctx);
     EXPECT_EQ(hash_table->num_buckets(), target_size);
     ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index e496cde..3d090a8 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -423,7 +423,7 @@ class HashTable {
   /// rounded up to a power of two, and also assumes that there are no duplicates.
   static int64_t EstimateNumBuckets(int64_t num_rows) {
     /// Assume max 66% fill factor and no duplicates.
-    return BitUtil::NextPowerOfTwo(3 * num_rows / 2);
+    return BitUtil::RoundUpToPowerOfTwo(3 * num_rows / 2);
   }
   static int64_t EstimateSize(int64_t num_rows) {
     int64_t num_buckets = EstimateNumBuckets(num_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 5789534..646c089 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -316,7 +316,7 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
     RowBatch* output_batch) {
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -361,7 +361,7 @@ Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
     RowBatch* output_batch) {
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -414,7 +414,7 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
     DCHECK(HasValidProbeRow());
@@ -471,7 +471,7 @@ Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
   ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
   size_t num_join_ctxs = join_conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_ && HasMoreProbeRows()) {
     DCHECK(HasValidProbeRow());
@@ -557,7 +557,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
   size_t num_ctxs = conjunct_ctxs_.size();
   DCHECK(matching_build_rows_ != NULL);
 
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   while (!build_row_iterator_.AtEnd()) {
     // This loop can go on for a long time if the conjuncts are very selective. Do query
     // maintenance every N iterations.
@@ -612,7 +612,7 @@ Status NestedLoopJoinNode::FindBuildMatches(
   ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
   size_t num_ctxs = conjunct_ctxs_.size();
 
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   while (!build_row_iterator_.AtEnd()) {
     DCHECK(current_probe_row_ != NULL);
     TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index a1fef0f..0bf51a9 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -432,7 +432,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
 
   SCOPED_TIMER(get_results_timer_);
   int count = 0;
-  const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+  const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   // Keeping returning rows from the current partition.
   while (!output_iterator_.AtEnd()) {
     // This loop can go on for a long time if the conjuncts are very selective. Do query

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 2659125..7616320 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -32,8 +32,8 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o
 
 const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
 
-const int32_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int32_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
 RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
     : query_ctx_(query_ctx), state_(state), closed_(false) {
@@ -41,10 +41,26 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
       state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
 
   // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
-  int32_t bloom_filter_size = query_ctx_.request.query_options.runtime_bloom_filter_size;
-  bloom_filter_size = std::max(bloom_filter_size, MIN_BLOOM_FILTER_SIZE);
-  bloom_filter_size = std::min(bloom_filter_size, MAX_BLOOM_FILTER_SIZE);
-  default_log_filter_size_ = Bits::Log2Ceiling64(bloom_filter_size);
+  max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
+  max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  max_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
+  min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+  min_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+  // Make sure that min <= max
+  min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+  DCHECK_GT(min_filter_size_, 0);
+  DCHECK_GT(max_filter_size_, 0);
+
+  default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
+  default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+  default_filter_size_ =
+      BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -172,11 +188,11 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
 }
 
 int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
-  if (ndv == -1) return 1LL << default_log_filter_size_;
+  if (ndv == -1) return default_filter_size_;
   int64_t required_space =
       1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
-  if (required_space > MAX_BLOOM_FILTER_SIZE) required_space = MAX_BLOOM_FILTER_SIZE;
-  if (required_space < MIN_BLOOM_FILTER_SIZE) required_space = MIN_BLOOM_FILTER_SIZE;
+  required_space = max<int64_t>(required_space, min_filter_size_);
+  required_space = min<int64_t>(required_space, max_filter_size_);
   return required_space;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 50c77b0..178c03f 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -100,8 +100,8 @@ class RuntimeFilterBank {
   /// Releases all memory allocated for BloomFilters.
   void Close();
 
-  static const int32_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
-  static const int32_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
+  static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024;           // 4KB
+  static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024;   // 16MB
 
  private:
   /// Returns the the space (in bytes) required for a filter to achieve the configured
@@ -136,8 +136,14 @@ class RuntimeFilterBank {
   /// Total amount of memory allocated to Bloom Filters
   RuntimeProfile::Counter* memory_allocated_;
 
-  /// Precomputed logarithm of default BloomFilter heap size.
-  int default_log_filter_size_;
+  /// Precomputed default BloomFilter size.
+  int64_t default_filter_size_;
+
+  /// Maximum filter size, in bytes, rounded up to a power of two.
+  int64_t max_filter_size_;
+
+  /// Minimum filter size, in bytes, rounded up to a power of two.
+  int64_t min_filter_size_;
 };
 
 /// RuntimeFilters represent set-membership predicates (implemented with bloom filters)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 83a5770..53767d9 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -29,30 +29,38 @@ using namespace std;
 
 TEST(QueryOptions, SetBloomSize) {
   TQueryOptions options;
-
-  // The upper and lower bound of the allowed values:
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-     lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options, NULL)
-     .ok());
-
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options, NULL)
-      .ok());
-
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
-  EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
-      lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
-  EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
-  // Parsing memory values works in a reasonable way:
-  EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "1MB", &options, NULL));
-  EXPECT_EQ(1 << 20, options.runtime_bloom_filter_size);
-
-  // Bloom filters cannot occupy a percentage of memory:
-  EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "10%", &options, NULL).ok());
+  vector<pair<string, int*>> option_list = {
+    {"RUNTIME_BLOOM_FILTER_SIZE", &options.runtime_bloom_filter_size},
+    {"RUNTIME_FILTER_MAX_SIZE", &options.runtime_filter_max_size},
+    {"RUNTIME_FILTER_MIN_SIZE", &options.runtime_filter_min_size}};
+  for (const auto& option: option_list) {
+
+    // The upper and lower bound of the allowed values:
+    EXPECT_FALSE(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options,
+        NULL)
+        .ok());
+
+    EXPECT_FALSE(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options,
+        NULL)
+        .ok());
+
+    EXPECT_OK(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
+    EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, *option.second);
+
+    EXPECT_OK(SetQueryOption(option.first,
+        lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
+    EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, *option.second);
+
+    // Parsing memory values works in a reasonable way:
+    EXPECT_OK(SetQueryOption(option.first, "1MB", &options, NULL));
+    EXPECT_EQ(1 << 20, *option.second);
+
+    // Bloom filters cannot occupy a percentage of memory:
+    EXPECT_FALSE(SetQueryOption(option.first, "10%", &options, NULL).ok());
+  }
 }
 
 TEST(QueryOptions, SetFilterWait) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ce538bf..4617256 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -325,17 +325,26 @@ Status impala::SetQueryOption(const string& key, const string& value,
               " OFF(0), LOCAL(1) or GLOBAL(2).", value));
         }
         break;
+      case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE:
+      case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE:
       case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
         int64_t size;
         RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size));
         if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE ||
             size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
-          return Status(Substitute(
-              "$0 is not a valid Bloom filter size. Valid sizes are in [$1, $2].", value,
-              RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
-              RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+          return Status(Substitute("$0 is not a valid Bloom filter size for $1. "
+                  "Valid sizes are in [$2, $3].", value, PrintTImpalaQueryOptions(
+                      static_cast<TImpalaQueryOptions::type>(option)),
+                  RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
+                  RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+        }
+        if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
+          query_options->__set_runtime_bloom_filter_size(size);
+        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {
+          query_options->__set_runtime_filter_min_size(size);
+        } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) {
+          query_options->__set_runtime_filter_max_size(size);
         }
-        query_options->__set_runtime_bloom_filter_size(size);
         break;
       }
       case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 56e2e1a..6f4c214 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\
+      TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -77,7 +77,9 @@ class TQueryOptions;
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
   QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
   QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
-  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING);
+  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
+  QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
+  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index e255f0c..eed6df3 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -51,11 +51,10 @@ class BitUtil {
     return (value / factor) * factor;
   }
 
-  /// Returns the smallest power of two that contains v. Taken from
+  /// Returns the smallest power of two that contains v. If v is a power of two, v is
+  /// returned. Taken from
   /// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
-  /// TODO: Pick a better name, as it is not clear what happens when the input is
-  /// already a power of two.
-  static inline int64_t NextPowerOfTwo(int64_t v) {
+  static inline int64_t RoundUpToPowerOfTwo(int64_t v) {
     --v;
     v |= v >> 1;
     v |= v >> 2;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 7e6a290..4cf606b 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -56,7 +56,8 @@ namespace impala {
 // Macro to stamp out operator<< for thrift enums.  Why doesn't thrift do this?
 #define THRIFT_ENUM_OUTPUT_FN(E) THRIFT_ENUM_OUTPUT_FN_IMPL(E , _##E##_VALUES_TO_NAMES)
 
-// Macro to implement Print function that returns string for thrift enums
+// Macro to implement Print function that returns string for thrift enums. Make sure you
+// define a corresponding THRIFT_ENUM_OUTPUT_FN.
 #define THRIFT_ENUM_PRINT_FN(E) \
   string Print##E(const E::type& e) {\
     stringstream ss;\
@@ -78,6 +79,7 @@ THRIFT_ENUM_OUTPUT_FN(CompressionCodec);
 THRIFT_ENUM_OUTPUT_FN(Type);
 THRIFT_ENUM_OUTPUT_FN(TMetricKind);
 THRIFT_ENUM_OUTPUT_FN(TUnit);
+THRIFT_ENUM_OUTPUT_FN(TImpalaQueryOptions);
 
 THRIFT_ENUM_PRINT_FN(TCatalogObjectType);
 THRIFT_ENUM_PRINT_FN(TDdlType);
@@ -88,6 +90,7 @@ THRIFT_ENUM_PRINT_FN(QueryState);
 THRIFT_ENUM_PRINT_FN(Encoding);
 THRIFT_ENUM_PRINT_FN(TMetricKind);
 THRIFT_ENUM_PRINT_FN(TUnit);
+THRIFT_ENUM_PRINT_FN(TImpalaQueryOptions);
 
 
 ostream& operator<<(ostream& os, const TUniqueId& id) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 6872e66..c9550dc 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -69,6 +69,7 @@ std::string PrintEncoding(const parquet::Encoding::type& type);
 std::string PrintAsHex(const char* bytes, int64_t len);
 std::string PrintTMetricKind(const TMetricKind::type& type);
 std::string PrintTUnit(const TUnit::type& type);
+std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type);
 /// Returns the fully qualified path, e.g. "database.table.array_col.item.field"
 std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path);
 /// Returns the numeric path without column/field names, e.g. "[0,1,2]"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/fixed-size-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/util/fixed-size-hash-table.h b/be/src/util/fixed-size-hash-table.h
index 8ecb328..828769e 100644
--- a/be/src/util/fixed-size-hash-table.h
+++ b/be/src/util/fixed-size-hash-table.h
@@ -46,8 +46,8 @@ class FixedSizeHashTable {
     DCHECK_GT(min_capacity, 0);
     // Capacity cannot be greater than largest uint32_t power of two.
     capacity_ = static_cast<uint32_t>(std::min(static_cast<int64_t>(1) << 31,
-        BitUtil::NextPowerOfTwo(min_capacity)));
-    DCHECK_EQ(capacity_, BitUtil::NextPowerOfTwo(capacity_));
+        BitUtil::RoundUpToPowerOfTwo(min_capacity)));
+    DCHECK_EQ(capacity_, BitUtil::RoundUpToPowerOfTwo(capacity_));
     if (tbl_ != NULL) free(tbl_);
     int64_t tbl_byte_size = capacity_ * sizeof(Entry);
     tbl_ = reinterpret_cast<Entry*>(malloc(tbl_byte_size));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc
index a6726fd..bd49371 100644
--- a/be/src/util/parse-util-test.cc
+++ b/be/src/util/parse-util-test.cc
@@ -31,7 +31,8 @@ TEST(ParseMemSpecs, Basic) {
   bool is_percent;
   int64_t bytes;
 
-  int64_t megabytes = 1024 * 1024;
+  int64_t kilobytes = 1024;
+  int64_t megabytes = 1024 * kilobytes;
   int64_t gigabytes = 1024 * megabytes;
 
   bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem());
@@ -42,6 +43,14 @@ TEST(ParseMemSpecs, Basic) {
   ASSERT_EQ(100, bytes);
   ASSERT_FALSE(is_percent);
 
+  bytes = ParseUtil::ParseMemSpec("100kb", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(100 * 1024, bytes);
+  ASSERT_FALSE(is_percent);
+
+  bytes = ParseUtil::ParseMemSpec("5KB", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(5 * 1024, bytes);
+  ASSERT_FALSE(is_percent);
+
   bytes = ParseUtil::ParseMemSpec("4MB", &is_percent, MemInfo::physical_mem());
   ASSERT_EQ(4 * megabytes, bytes);
   ASSERT_FALSE(is_percent);
@@ -77,6 +86,7 @@ TEST(ParseMemSpecs, Basic) {
   bad_values.push_back("gb");
   bad_values.push_back("1GMb");
   bad_values.push_back("1b1Mb");
+  bad_values.push_back("1kib");
   bad_values.push_back("1Bb");
   bad_values.push_back("1%%");
   bad_values.push_back("1.1");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index 60cf3ac..1a0af85 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -49,6 +49,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
       number_str_len--;
       multiplier = 1024L * 1024L;
       break;
+    case 'k':
+    case 'K':
+      // Kilobytes
+      number_str_len--;
+      multiplier = 1024L;
+      break;
     case '%':
       // Don't allow a suffix of "%B".
       if (suffix_char != mem_spec_str.rbegin()) return -1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h
index 5b13137..02ce120 100644
--- a/be/src/util/parse-util.h
+++ b/be/src/util/parse-util.h
@@ -27,6 +27,7 @@ class ParseUtil {
   /// Sets *is_percent to indicate whether the given spec is in percent.
   /// Accepted formats:
   /// '<int>[bB]?'  -> bytes (default if no unit given)
+  /// '<float>[kK(bB)]' -> kilobytes
   /// '<float>[mM(bB)]' -> megabytes
   /// '<float>[gG(bB)]' -> in gigabytes
   /// '<int>%'      -> in percent of relative_reference

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 611155c..b9892e6 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -192,6 +192,12 @@ struct TQueryOptions {
   // those queries, the coordinator deletes all files in the final location before copying
   // the files there.
   45: optional bool s3_skip_insert_staging = true
+
+  // Minimum runtime filter size, in bytes
+  46: optional i32 runtime_filter_min_size = 1048576
+
+  // Maximum runtime filter size, in bytes
+  47: optional i32 runtime_filter_max_size = 16777216
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0a030ad..68647df 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,6 +219,12 @@ enum TImpalaQueryOptions {
   // the files there.
   // TODO: Find a way to get this working for INSERT OVERWRITEs too.
   S3_SKIP_INSERT_STAGING
+
+  // Maximum runtime filter size, in bytes.
+  RUNTIME_FILTER_MAX_SIZE,
+
+  // Minimum runtime filter size, in bytes.
+  RUNTIME_FILTER_MIN_SIZE
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 4432810..2d32064 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -259,6 +259,7 @@ row_regex: .*RowsReturned: 2.43K .*
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=15000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4K;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [BROADCAST]
     # Build-side needs to be sufficiently large to trigger FP check.
@@ -335,6 +336,7 @@ select STRAIGHT_JOIN count(a.id) from alltypes a
 ####################################################
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1) b on a.l_orderkey = -b.l_orderkey;
@@ -347,6 +349,7 @@ row_regex: .*Filter 0 \(4.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
@@ -359,6 +362,7 @@ row_regex: .*Filter 0 \(256.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
@@ -371,6 +375,7 @@ row_regex: .*Filter 0 \(512.00 KB\).*
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
 with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;
@@ -380,3 +385,44 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 row_regex: .*1 of 1 Runtime Filter Published.*
 row_regex: .*Filter 0 \(1.00 MB\).*
 ====
+
+
+---- QUERY
+####################################################
+# Test case 16: Filter sizes respect query options
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=8KB;
+SET RUNTIME_FILTER_MAX_SIZE=8KB;
+# This query would produce a 4KB filter without setting the minimum size.
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+# Check that filter sizes are rounded up to power-of-two
+SET RUNTIME_FILTER_MIN_SIZE=6000B;
+SET RUNTIME_FILTER_MAX_SIZE=6000B;
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MAX_SIZE=8192;
+# Query would produce a 512KB filter without setting the max
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+    join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
+---- RUNTIME_PROFILE
+row_regex: .*0 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
index 324eb1c..4743f3e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
@@ -24,6 +24,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 
 SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4096;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [BROADCAST]
     # Build-side needs to be sufficiently large to trigger FP check.