You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/09 15:45:07 UTC
[impala] 01/02: IMPALA-9612: Fix race condition in
RuntimeFilter::WaitForArrival
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5e69ae1d7dc113bbcc8d7d75e3b1b5244e76f76a
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Apr 6 22:24:05 2020 -0700
IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival
In function RuntimeFilter::WaitForArrival, there is a race condition
where condition variable arrival_cv_ may be signaled right after
thread get into the loop and before it call arrival_cv_.WaitFor().
This can cause runtime filter to wait the entire
RUNTIME_FILTER_WAIT_TIME_MS even though the filter has arrived or
canceled earlier than that. This commit avoid the race condition by
making RuntimeFilter::SetFilter and RuntimeFilter::Cancel acquire
arrival_mutex_ first before checking the value of arrival_time_ and
release arrival_mutex_ before signaling arrival_cv_.
Testing:
- Add new be test runtime-filter-test.cc
- Pass core tests.
Change-Id: I7dffa626103ef0af06ad1e89231b0d2ee54bb94a
Reviewed-on: http://gerrit.cloudera.org:8080/15673
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/CMakeLists.txt | 2 +
be/src/runtime/runtime-filter-test.cc | 103 ++++++++++++++++++++++++++++++++++
be/src/runtime/runtime-filter.cc | 33 +++++++----
be/src/runtime/runtime-filter.h | 7 +++
4 files changed, 133 insertions(+), 12 deletions(-)
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6990421..12e279e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -98,6 +98,7 @@ add_library(RuntimeTests STATIC
multi-precision-test.cc
raw-value-test.cc
row-batch-serialize-test.cc
+ runtime-filter-test.cc
string-buffer-test.cc
string-compare-test.cc
string-search-test.cc
@@ -131,6 +132,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*")
ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*")
# Exception to unified be tests: Custom main function with global Frontend object
+ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*")
ADD_BE_LSAN_TEST(row-batch-test)
# Exception to unified be tests: Custom main function with global Frontend object
ADD_BE_LSAN_TEST(collection-value-builder-test)
diff --git a/be/src/runtime/runtime-filter-test.cc b/be/src/runtime/runtime-filter-test.cc
new file mode 100644
index 0000000..76680a5
--- /dev/null
+++ b/be/src/runtime/runtime-filter-test.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/thread/thread.hpp>
+
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter.inline.h"
+#include "testutil/gtest-util.h"
+#include "util/stopwatch.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+namespace impala {
+
+class RuntimeFilterTest : public testing::Test {
+ protected:
+ ObjectPool pool_;
+ MemTracker tracker_;
+
+ virtual void SetUp() {}
+
+ virtual void TearDown() { pool_.Clear(); }
+
+ void SetDelay(RuntimeFilter* rf, int64_t delay) { rf->injection_delay_ = delay; }
+};
+
+struct TestConfig {
+ RuntimeFilter* runtime_filter;
+ int64_t injection_delay;
+ int64_t wait_for_ms;
+ MinMaxFilter* min_max_filter;
+};
+
+// Test that RuntimeFilter stop waiting after it is canceled.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Canceled) {
+ TRuntimeFilterDesc desc;
+ desc.__set_type(TRuntimeFilterType::MIN_MAX);
+ RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+ TestConfig tc = {rf, 500, 1000, nullptr};
+
+ SetDelay(rf, tc.injection_delay);
+ MonotonicStopWatch sw;
+ thread_group workers;
+
+ sw.Start();
+ workers.add_thread(
+ new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+ SleepForMs(100); // give waiting thread a head start
+ workers.add_thread(new thread([&tc] { tc.runtime_filter->Cancel(); }));
+ workers.join_all();
+ sw.Stop();
+
+ ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+ ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+// Test that RuntimeFilter stop waiting after the filter arrived.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Arrived) {
+ TRuntimeFilterDesc desc;
+ desc.__set_type(TRuntimeFilterType::MIN_MAX);
+ RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+ MinMaxFilter* mmf =
+ MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &pool_, &tracker_);
+ TestConfig tc = {rf, 500, 1000, mmf};
+
+ SetDelay(rf, tc.injection_delay);
+ MonotonicStopWatch sw;
+ thread_group workers;
+
+ sw.Start();
+ workers.add_thread(
+ new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+ SleepForMs(100); // give waiting thread a head start
+ workers.add_thread(
+ new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); }));
+ workers.join_all();
+ sw.Stop();
+
+ ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+ ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+} // namespace impala
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 4b013a6..1ff0a13 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -26,17 +26,20 @@ using namespace impala;
const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
- DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
- DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
- if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
- if (is_bloom_filter()) {
- bloom_filter_.Store(bloom_filter);
- } else {
- DCHECK(is_min_max_filter());
- min_max_filter_.Store(min_max_filter);
+ {
+ unique_lock<mutex> l(arrival_mutex_);
+ DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
+ DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+ if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
+ if (is_bloom_filter()) {
+ bloom_filter_.Store(bloom_filter);
+ } else {
+ DCHECK(is_min_max_filter());
+ min_max_filter_.Store(min_max_filter);
+ }
+ arrival_time_.Store(MonotonicMillis());
+ has_filter_.Store(true);
}
- arrival_time_.Store(MonotonicMillis());
- has_filter_.Store(true);
arrival_cv_.NotifyAll();
}
@@ -64,8 +67,11 @@ void RuntimeFilter::Or(RuntimeFilter* other) {
}
void RuntimeFilter::Cancel() {
- if (arrival_time_.Load() != 0) return;
- arrival_time_.Store(MonotonicMillis());
+ {
+ unique_lock<mutex> l(arrival_mutex_);
+ if (arrival_time_.Load() != 0) return;
+ arrival_time_.Store(MonotonicMillis());
+ }
arrival_cv_.NotifyAll();
}
@@ -75,6 +81,9 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
int64_t ms_remaining = timeout_ms - ms_since_registration;
if (ms_remaining <= 0) break;
+#ifndef NDEBUG
+ if (injection_delay_ > 0) SleepForMs(injection_delay_);
+#endif
arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI);
}
return arrival_time_.Load() != 0;
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5108f83..e2a4a67 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -28,6 +28,7 @@
namespace impala {
class BloomFilter;
+class RuntimeFilterTest;
/// RuntimeFilters represent set-membership predicates that are computed during query
/// execution (rather than during planning). They can then be sent to other operators to
@@ -117,6 +118,8 @@ class RuntimeFilter {
static const char* LLVM_CLASS_NAME;
private:
+ friend class RuntimeFilterTest;
+
/// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
/// it does not filter any rows, either because it was not created
/// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
@@ -148,5 +151,9 @@ class RuntimeFilter {
/// Signalled when a filter arrives or the filter is cancelled. Paired with
/// 'arrival_mutex_'
mutable ConditionVariable arrival_cv_;
+
+ /// Injection delay for WaitForArrival. Used in testing only.
+ /// See IMPALA-9612.
+ int64_t injection_delay_ = 0;
};
}