You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/13 06:09:34 UTC
[03/10] incubator-impala git commit: IMPALA-3480: Add query options
for min/max filter sizes
IMPALA-3480: Add query options for min/max filter sizes
This patch adds two query options for runtime filters:
RUNTIME_FILTER_MAX_SIZE
RUNTIME_FILTER_MIN_SIZE
These options define the minimum and maximum filter sizes for a filter,
no matter what the estimates produced by the planner are. Filter sizes
are rounded up to the nearest power of two.
Change-Id: I5c13c200a0f1855f38a5da50ca34a737e741868b
Reviewed-on: http://gerrit.cloudera.org:8080/2966
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/df1412c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/df1412c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/df1412c9
Branch: refs/heads/master
Commit: df1412c962945fe6e69591e80354fad692413ba3
Parents: 14cdb04
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu May 5 10:00:29 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:35 2016 -0700
----------------------------------------------------------------------
be/src/exec/hash-table-test.cc | 6 +--
be/src/exec/hash-table.h | 2 +-
be/src/exec/nested-loop-join-node.cc | 12 ++---
be/src/exec/partitioned-aggregation-node.cc | 2 +-
be/src/runtime/runtime-filter.cc | 34 ++++++++----
be/src/runtime/runtime-filter.h | 14 +++--
be/src/service/query-options-test.cc | 56 +++++++++++---------
be/src/service/query-options.cc | 19 +++++--
be/src/service/query-options.h | 6 ++-
be/src/util/bit-util.h | 7 ++-
be/src/util/debug-util.cc | 5 +-
be/src/util/debug-util.h | 1 +
be/src/util/fixed-size-hash-table.h | 4 +-
be/src/util/parse-util-test.cc | 12 ++++-
be/src/util/parse-util.cc | 6 +++
be/src/util/parse-util.h | 1 +
common/thrift/ImpalaInternalService.thrift | 6 +++
common/thrift/ImpalaService.thrift | 6 +++
.../queries/QueryTest/runtime_filters.test | 46 ++++++++++++++++
.../queries/QueryTest/runtime_filters_wait.test | 1 +
20 files changed, 183 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index bef3f1e..25cd4f1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -189,7 +189,7 @@ class HashTableTest : public testing::Test {
&tracker_, runtime_state_, &client).ok());
// Initial_num_buckets must be a power of two.
- EXPECT_EQ(initial_num_buckets, BitUtil::NextPowerOfTwo(initial_num_buckets));
+ EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
int64_t max_num_buckets = 1L << 31;
table->reset(new HashTable(quadratic, runtime_state_, client, 1, NULL,
max_num_buckets, initial_num_buckets));
@@ -354,12 +354,12 @@ class HashTableTest : public testing::Test {
ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
// Resize and try again.
- int target_size = BitUtil::NextPowerOfTwo(2 * total_rows);
+ int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
ResizeTable(hash_table.get(), target_size, &ht_ctx);
EXPECT_EQ(hash_table->num_buckets(), target_size);
ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
- target_size = BitUtil::NextPowerOfTwo(total_rows + 1);
+ target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
ResizeTable(hash_table.get(), target_size, &ht_ctx);
EXPECT_EQ(hash_table->num_buckets(), target_size);
ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index e496cde..3d090a8 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -423,7 +423,7 @@ class HashTable {
/// rounded up to a power of two, and also assumes that there are no duplicates.
static int64_t EstimateNumBuckets(int64_t num_rows) {
/// Assume max 66% fill factor and no duplicates.
- return BitUtil::NextPowerOfTwo(3 * num_rows / 2);
+ return BitUtil::RoundUpToPowerOfTwo(3 * num_rows / 2);
}
static int64_t EstimateSize(int64_t num_rows) {
int64_t num_buckets = EstimateNumBuckets(num_rows);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 5789534..646c089 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -316,7 +316,7 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
RowBatch* output_batch) {
ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
size_t num_join_ctxs = join_conjunct_ctxs_.size();
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
@@ -361,7 +361,7 @@ Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
RowBatch* output_batch) {
ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
size_t num_join_ctxs = join_conjunct_ctxs_.size();
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
@@ -414,7 +414,7 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
size_t num_join_ctxs = join_conjunct_ctxs_.size();
DCHECK(matching_build_rows_ != NULL);
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_) {
DCHECK(HasValidProbeRow());
@@ -471,7 +471,7 @@ Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
size_t num_join_ctxs = join_conjunct_ctxs_.size();
DCHECK(matching_build_rows_ != NULL);
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!eos_ && HasMoreProbeRows()) {
DCHECK(HasValidProbeRow());
@@ -557,7 +557,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
size_t num_ctxs = conjunct_ctxs_.size();
DCHECK(matching_build_rows_ != NULL);
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!build_row_iterator_.AtEnd()) {
// This loop can go on for a long time if the conjuncts are very selective. Do query
// maintenance every N iterations.
@@ -612,7 +612,7 @@ Status NestedLoopJoinNode::FindBuildMatches(
ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
size_t num_ctxs = conjunct_ctxs_.size();
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
while (!build_row_iterator_.AtEnd()) {
DCHECK(current_probe_row_ != NULL);
TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index a1fef0f..0bf51a9 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -432,7 +432,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
SCOPED_TIMER(get_results_timer_);
int count = 0;
- const int N = BitUtil::NextPowerOfTwo(state->batch_size());
+ const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
// Keeping returning rows from the current partition.
while (!output_iterator_.AtEnd()) {
// This loop can go on for a long time if the conjuncts are very selective. Do query
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 2659125..7616320 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -32,8 +32,8 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability o
const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
-const int32_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int32_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
: query_ctx_(query_ctx), state_(state), closed_(false) {
@@ -41,10 +41,26 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
// Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
- int32_t bloom_filter_size = query_ctx_.request.query_options.runtime_bloom_filter_size;
- bloom_filter_size = std::max(bloom_filter_size, MIN_BLOOM_FILTER_SIZE);
- bloom_filter_size = std::min(bloom_filter_size, MAX_BLOOM_FILTER_SIZE);
- default_log_filter_size_ = Bits::Log2Ceiling64(bloom_filter_size);
+ max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
+ max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+ max_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+ min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
+ min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+ min_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+ // Make sure that min <= max
+ min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+ DCHECK_GT(min_filter_size_, 0);
+ DCHECK_GT(max_filter_size_, 0);
+
+ default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
+ default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+ default_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
}
RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -172,11 +188,11 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
}
int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
- if (ndv == -1) return 1LL << default_log_filter_size_;
+ if (ndv == -1) return default_filter_size_;
int64_t required_space =
1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
- if (required_space > MAX_BLOOM_FILTER_SIZE) required_space = MAX_BLOOM_FILTER_SIZE;
- if (required_space < MIN_BLOOM_FILTER_SIZE) required_space = MIN_BLOOM_FILTER_SIZE;
+ required_space = max<int64_t>(required_space, min_filter_size_);
+ required_space = min<int64_t>(required_space, max_filter_size_);
return required_space;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 50c77b0..178c03f 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -100,8 +100,8 @@ class RuntimeFilterBank {
/// Releases all memory allocated for BloomFilters.
void Close();
- static const int32_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB
- static const int32_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB
+ static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB
+ static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB
private:
/// Returns the the space (in bytes) required for a filter to achieve the configured
@@ -136,8 +136,14 @@ class RuntimeFilterBank {
/// Total amount of memory allocated to Bloom Filters
RuntimeProfile::Counter* memory_allocated_;
- /// Precomputed logarithm of default BloomFilter heap size.
- int default_log_filter_size_;
+ /// Precomputed default BloomFilter size.
+ int64_t default_filter_size_;
+
+ /// Maximum filter size, in bytes, rounded up to a power of two.
+ int64_t max_filter_size_;
+
+ /// Minimum filter size, in bytes, rounded up to a power of two.
+ int64_t min_filter_size_;
};
/// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 83a5770..53767d9 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -29,30 +29,38 @@ using namespace std;
TEST(QueryOptions, SetBloomSize) {
TQueryOptions options;
-
- // The upper and lower bound of the allowed values:
- EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
- lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options, NULL)
- .ok());
-
- EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
- lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options, NULL)
- .ok());
-
- EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
- lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
- EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
- EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE",
- lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
- EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, options.runtime_bloom_filter_size);
-
- // Parsing memory values works in a reasonable way:
- EXPECT_OK(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "1MB", &options, NULL));
- EXPECT_EQ(1 << 20, options.runtime_bloom_filter_size);
-
- // Bloom filters cannot occupy a percentage of memory:
- EXPECT_FALSE(SetQueryOption("RUNTIME_BLOOM_FILTER_SIZE", "10%", &options, NULL).ok());
+ vector<pair<string, int*>> option_list = {
+ {"RUNTIME_BLOOM_FILTER_SIZE", &options.runtime_bloom_filter_size},
+ {"RUNTIME_FILTER_MAX_SIZE", &options.runtime_filter_max_size},
+ {"RUNTIME_FILTER_MIN_SIZE", &options.runtime_filter_min_size}};
+ for (const auto& option: option_list) {
+
+ // The upper and lower bound of the allowed values:
+ EXPECT_FALSE(SetQueryOption(option.first,
+ lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE - 1), &options,
+ NULL)
+ .ok());
+
+ EXPECT_FALSE(SetQueryOption(option.first,
+ lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE + 1), &options,
+ NULL)
+ .ok());
+
+ EXPECT_OK(SetQueryOption(option.first,
+ lexical_cast<string>(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE), &options, NULL));
+ EXPECT_EQ(RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE, *option.second);
+
+ EXPECT_OK(SetQueryOption(option.first,
+ lexical_cast<string>(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE), &options, NULL));
+ EXPECT_EQ(RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE, *option.second);
+
+ // Parsing memory values works in a reasonable way:
+ EXPECT_OK(SetQueryOption(option.first, "1MB", &options, NULL));
+ EXPECT_EQ(1 << 20, *option.second);
+
+ // Bloom filters cannot occupy a percentage of memory:
+ EXPECT_FALSE(SetQueryOption(option.first, "10%", &options, NULL).ok());
+ }
}
TEST(QueryOptions, SetFilterWait) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ce538bf..4617256 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -325,17 +325,26 @@ Status impala::SetQueryOption(const string& key, const string& value,
" OFF(0), LOCAL(1) or GLOBAL(2).", value));
}
break;
+ case TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE:
+ case TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE:
case TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE: {
int64_t size;
RETURN_IF_ERROR(ParseMemValue(value, "Bloom filter size", &size));
if (size < RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE ||
size > RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE) {
- return Status(Substitute(
- "$0 is not a valid Bloom filter size. Valid sizes are in [$1, $2].", value,
- RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
- RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+ return Status(Substitute("$0 is not a valid Bloom filter size for $1. "
+ "Valid sizes are in [$2, $3].", value, PrintTImpalaQueryOptions(
+ static_cast<TImpalaQueryOptions::type>(option)),
+ RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
+ RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE));
+ }
+ if (option == TImpalaQueryOptions::RUNTIME_BLOOM_FILTER_SIZE) {
+ query_options->__set_runtime_bloom_filter_size(size);
+ } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE) {
+ query_options->__set_runtime_filter_min_size(size);
+ } else if (option == TImpalaQueryOptions::RUNTIME_FILTER_MAX_SIZE) {
+ query_options->__set_runtime_filter_max_size(size);
}
- query_options->__set_runtime_bloom_filter_size(size);
break;
}
case TImpalaQueryOptions::RUNTIME_FILTER_WAIT_TIME_MS: {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 56e2e1a..6f4c214 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::S3_SKIP_INSERT_STAGING + 1);\
+ TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -77,7 +77,9 @@ class TQueryOptions;
QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
- QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING);
+ QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
+ QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
+ QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
/// Converts a TQueryOptions struct into a map of key, value pairs.
void TQueryOptionsToMap(const TQueryOptions& query_options,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index e255f0c..eed6df3 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -51,11 +51,10 @@ class BitUtil {
return (value / factor) * factor;
}
- /// Returns the smallest power of two that contains v. Taken from
+ /// Returns the smallest power of two that contains v. If v is a power of two, v is
+ /// returned. Taken from
/// http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
- /// TODO: Pick a better name, as it is not clear what happens when the input is
- /// already a power of two.
- static inline int64_t NextPowerOfTwo(int64_t v) {
+ static inline int64_t RoundUpToPowerOfTwo(int64_t v) {
--v;
v |= v >> 1;
v |= v >> 2;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 7e6a290..4cf606b 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -56,7 +56,8 @@ namespace impala {
// Macro to stamp out operator<< for thrift enums. Why doesn't thrift do this?
#define THRIFT_ENUM_OUTPUT_FN(E) THRIFT_ENUM_OUTPUT_FN_IMPL(E , _##E##_VALUES_TO_NAMES)
-// Macro to implement Print function that returns string for thrift enums
+// Macro to implement Print function that returns string for thrift enums. Make sure you
+// define a corresponding THRIFT_ENUM_OUTPUT_FN.
#define THRIFT_ENUM_PRINT_FN(E) \
string Print##E(const E::type& e) {\
stringstream ss;\
@@ -78,6 +79,7 @@ THRIFT_ENUM_OUTPUT_FN(CompressionCodec);
THRIFT_ENUM_OUTPUT_FN(Type);
THRIFT_ENUM_OUTPUT_FN(TMetricKind);
THRIFT_ENUM_OUTPUT_FN(TUnit);
+THRIFT_ENUM_OUTPUT_FN(TImpalaQueryOptions);
THRIFT_ENUM_PRINT_FN(TCatalogObjectType);
THRIFT_ENUM_PRINT_FN(TDdlType);
@@ -88,6 +90,7 @@ THRIFT_ENUM_PRINT_FN(QueryState);
THRIFT_ENUM_PRINT_FN(Encoding);
THRIFT_ENUM_PRINT_FN(TMetricKind);
THRIFT_ENUM_PRINT_FN(TUnit);
+THRIFT_ENUM_PRINT_FN(TImpalaQueryOptions);
ostream& operator<<(ostream& os, const TUniqueId& id) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 6872e66..c9550dc 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -69,6 +69,7 @@ std::string PrintEncoding(const parquet::Encoding::type& type);
std::string PrintAsHex(const char* bytes, int64_t len);
std::string PrintTMetricKind(const TMetricKind::type& type);
std::string PrintTUnit(const TUnit::type& type);
+std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type);
/// Returns the fully qualified path, e.g. "database.table.array_col.item.field"
std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path);
/// Returns the numeric path without column/field names, e.g. "[0,1,2]"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/fixed-size-hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/util/fixed-size-hash-table.h b/be/src/util/fixed-size-hash-table.h
index 8ecb328..828769e 100644
--- a/be/src/util/fixed-size-hash-table.h
+++ b/be/src/util/fixed-size-hash-table.h
@@ -46,8 +46,8 @@ class FixedSizeHashTable {
DCHECK_GT(min_capacity, 0);
// Capacity cannot be greater than largest uint32_t power of two.
capacity_ = static_cast<uint32_t>(std::min(static_cast<int64_t>(1) << 31,
- BitUtil::NextPowerOfTwo(min_capacity)));
- DCHECK_EQ(capacity_, BitUtil::NextPowerOfTwo(capacity_));
+ BitUtil::RoundUpToPowerOfTwo(min_capacity)));
+ DCHECK_EQ(capacity_, BitUtil::RoundUpToPowerOfTwo(capacity_));
if (tbl_ != NULL) free(tbl_);
int64_t tbl_byte_size = capacity_ * sizeof(Entry);
tbl_ = reinterpret_cast<Entry*>(malloc(tbl_byte_size));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc
index a6726fd..bd49371 100644
--- a/be/src/util/parse-util-test.cc
+++ b/be/src/util/parse-util-test.cc
@@ -31,7 +31,8 @@ TEST(ParseMemSpecs, Basic) {
bool is_percent;
int64_t bytes;
- int64_t megabytes = 1024 * 1024;
+ int64_t kilobytes = 1024;
+ int64_t megabytes = 1024 * kilobytes;
int64_t gigabytes = 1024 * megabytes;
bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem());
@@ -42,6 +43,14 @@ TEST(ParseMemSpecs, Basic) {
ASSERT_EQ(100, bytes);
ASSERT_FALSE(is_percent);
+ bytes = ParseUtil::ParseMemSpec("100kb", &is_percent, MemInfo::physical_mem());
+ ASSERT_EQ(100 * 1024, bytes);
+ ASSERT_FALSE(is_percent);
+
+ bytes = ParseUtil::ParseMemSpec("5KB", &is_percent, MemInfo::physical_mem());
+ ASSERT_EQ(5 * 1024, bytes);
+ ASSERT_FALSE(is_percent);
+
bytes = ParseUtil::ParseMemSpec("4MB", &is_percent, MemInfo::physical_mem());
ASSERT_EQ(4 * megabytes, bytes);
ASSERT_FALSE(is_percent);
@@ -77,6 +86,7 @@ TEST(ParseMemSpecs, Basic) {
bad_values.push_back("gb");
bad_values.push_back("1GMb");
bad_values.push_back("1b1Mb");
+ bad_values.push_back("1kib");
bad_values.push_back("1Bb");
bad_values.push_back("1%%");
bad_values.push_back("1.1");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index 60cf3ac..1a0af85 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -49,6 +49,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
number_str_len--;
multiplier = 1024L * 1024L;
break;
+ case 'k':
+ case 'K':
+ // Kilobytes
+ number_str_len--;
+ multiplier = 1024L;
+ break;
case '%':
// Don't allow a suffix of "%B".
if (suffix_char != mem_spec_str.rbegin()) return -1;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/be/src/util/parse-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h
index 5b13137..02ce120 100644
--- a/be/src/util/parse-util.h
+++ b/be/src/util/parse-util.h
@@ -27,6 +27,7 @@ class ParseUtil {
/// Sets *is_percent to indicate whether the given spec is in percent.
/// Accepted formats:
/// '<int>[bB]?' -> bytes (default if no unit given)
+ /// '<float>[kK(bB)]' -> kilobytes
/// '<float>[mM(bB)]' -> megabytes
/// '<float>[gG(bB)]' -> in gigabytes
/// '<int>%' -> in percent of relative_reference
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 611155c..b9892e6 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -192,6 +192,12 @@ struct TQueryOptions {
// those queries, the coordinator deletes all files in the final location before copying
// the files there.
45: optional bool s3_skip_insert_staging = true
+
+ // Minimum runtime filter size, in bytes
+ 46: optional i32 runtime_filter_min_size = 1048576
+
+ // Maximum runtime filter size, in bytes
+ 47: optional i32 runtime_filter_max_size = 16777216
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0a030ad..68647df 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,6 +219,12 @@ enum TImpalaQueryOptions {
// the files there.
// TODO: Find a way to get this working for INSERT OVERWRITEs too.
S3_SKIP_INSERT_STAGING
+
+ // Maximum runtime filter size, in bytes.
+ RUNTIME_FILTER_MAX_SIZE,
+
+ // Minimum runtime filter size, in bytes.
+ RUNTIME_FILTER_MIN_SIZE
}
// The summary of an insert.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 4432810..2d32064 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -259,6 +259,7 @@ row_regex: .*RowsReturned: 2.43K .*
SET RUNTIME_FILTER_WAIT_TIME_MS=15000;
SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4K;
select STRAIGHT_JOIN count(*) from alltypes a
join [BROADCAST]
# Build-side needs to be sufficiently large to trigger FP check.
@@ -335,6 +336,7 @@ select STRAIGHT_JOIN count(a.id) from alltypes a
####################################################
SET RUNTIME_FILTER_MODE=GLOBAL;
SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
join (select * from l LIMIT 1) b on a.l_orderkey = -b.l_orderkey;
@@ -347,6 +349,7 @@ row_regex: .*Filter 0 \(4.00 KB\).*
---- QUERY
SET RUNTIME_FILTER_MODE=GLOBAL;
SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
@@ -359,6 +362,7 @@ row_regex: .*Filter 0 \(256.00 KB\).*
---- QUERY
SET RUNTIME_FILTER_MODE=GLOBAL;
SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
@@ -371,6 +375,7 @@ row_regex: .*Filter 0 \(512.00 KB\).*
---- QUERY
SET RUNTIME_FILTER_MODE=GLOBAL;
SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=4KB;
with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;
@@ -380,3 +385,44 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
row_regex: .*1 of 1 Runtime Filter Published.*
row_regex: .*Filter 0 \(1.00 MB\).*
====
+
+
+---- QUERY
+####################################################
+# Test case 16: Filter sizes respect query options
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=8KB;
+SET RUNTIME_FILTER_MAX_SIZE=8KB;
+# This query would produce a 4KB filter without setting the minimum size.
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+# Check that filter sizes are rounded up to power-of-two
+SET RUNTIME_FILTER_MIN_SIZE=6000B;
+SET RUNTIME_FILTER_MAX_SIZE=6000B;
+select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
+---- RESULTS
+7300
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MAX_SIZE=8192;
+# Query would produce a 512KB filter without setting the max
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+ join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
+---- RUNTIME_PROFILE
+row_regex: .*0 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(8.00 KB\).*
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/df1412c9/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
index 324eb1c..4743f3e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_wait.test
@@ -24,6 +24,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_MAX_SIZE=4096;
select STRAIGHT_JOIN count(*) from alltypes a
join [BROADCAST]
# Build-side needs to be sufficiently large to trigger FP check.