You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/09 15:45:07 UTC

[impala] 01/02: IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5e69ae1d7dc113bbcc8d7d75e3b1b5244e76f76a
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Apr 6 22:24:05 2020 -0700

    IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival
    
    In function RuntimeFilter::WaitForArrival, there is a race condition
    where condition variable arrival_cv_ may be signaled right after
    thread get into the loop and before it call arrival_cv_.WaitFor().
    This can cause runtime filter to wait the entire
    RUNTIME_FILTER_WAIT_TIME_MS even though the filter has arrived or
    canceled earlier than that. This commit avoid the race condition by
    making RuntimeFilter::SetFilter and RuntimeFilter::Cancel acquire
    arrival_mutex_ first before checking the value of arrival_time_ and
    release arrival_mutex_ before signaling arrival_cv_.
    
    Testing:
    - Add new be test runtime-filter-test.cc
    - Pass core tests.
    
    Change-Id: I7dffa626103ef0af06ad1e89231b0d2ee54bb94a
    Reviewed-on: http://gerrit.cloudera.org:8080/15673
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/CMakeLists.txt         |   2 +
 be/src/runtime/runtime-filter-test.cc | 103 ++++++++++++++++++++++++++++++++++
 be/src/runtime/runtime-filter.cc      |  33 +++++++----
 be/src/runtime/runtime-filter.h       |   7 +++
 4 files changed, 133 insertions(+), 12 deletions(-)

diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6990421..12e279e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -98,6 +98,7 @@ add_library(RuntimeTests STATIC
   multi-precision-test.cc
   raw-value-test.cc
   row-batch-serialize-test.cc
+  runtime-filter-test.cc
   string-buffer-test.cc
   string-compare-test.cc
   string-search-test.cc
@@ -131,6 +132,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*")
 # Exception to unified be tests: Custom main function with global Frontend object
+ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*")
 ADD_BE_LSAN_TEST(row-batch-test)
 # Exception to unified be tests: Custom main function with global Frontend object
 ADD_BE_LSAN_TEST(collection-value-builder-test)
diff --git a/be/src/runtime/runtime-filter-test.cc b/be/src/runtime/runtime-filter-test.cc
new file mode 100644
index 0000000..76680a5
--- /dev/null
+++ b/be/src/runtime/runtime-filter-test.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/thread/thread.hpp>
+
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter.inline.h"
+#include "testutil/gtest-util.h"
+#include "util/stopwatch.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+namespace impala {
+
+class RuntimeFilterTest : public testing::Test {
+ protected:
+  ObjectPool pool_;
+  MemTracker tracker_;
+
+  virtual void SetUp() {}
+
+  virtual void TearDown() { pool_.Clear(); }
+
+  void SetDelay(RuntimeFilter* rf, int64_t delay) { rf->injection_delay_ = delay; }
+};
+
+struct TestConfig {
+  RuntimeFilter* runtime_filter;
+  int64_t injection_delay;
+  int64_t wait_for_ms;
+  MinMaxFilter* min_max_filter;
+};
+
+// Test that RuntimeFilter stop waiting after it is canceled.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Canceled) {
+  TRuntimeFilterDesc desc;
+  desc.__set_type(TRuntimeFilterType::MIN_MAX);
+  RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+  TestConfig tc = {rf, 500, 1000, nullptr};
+
+  SetDelay(rf, tc.injection_delay);
+  MonotonicStopWatch sw;
+  thread_group workers;
+
+  sw.Start();
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+  SleepForMs(100); // give waiting thread a head start
+  workers.add_thread(new thread([&tc] { tc.runtime_filter->Cancel(); }));
+  workers.join_all();
+  sw.Stop();
+
+  ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+  ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+// Test that RuntimeFilter stop waiting after the filter arrived.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Arrived) {
+  TRuntimeFilterDesc desc;
+  desc.__set_type(TRuntimeFilterType::MIN_MAX);
+  RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+  MinMaxFilter* mmf =
+      MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &pool_, &tracker_);
+  TestConfig tc = {rf, 500, 1000, mmf};
+
+  SetDelay(rf, tc.injection_delay);
+  MonotonicStopWatch sw;
+  thread_group workers;
+
+  sw.Start();
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+  SleepForMs(100); // give waiting thread a head start
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); }));
+  workers.join_all();
+  sw.Stop();
+
+  ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+  ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+} // namespace impala
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 4b013a6..1ff0a13 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -26,17 +26,20 @@ using namespace impala;
 const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
 
 void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
-  DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
-  DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
-  if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
-  if (is_bloom_filter()) {
-    bloom_filter_.Store(bloom_filter);
-  } else {
-    DCHECK(is_min_max_filter());
-    min_max_filter_.Store(min_max_filter);
+  {
+    unique_lock<mutex> l(arrival_mutex_);
+    DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
+    DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+    if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
+    if (is_bloom_filter()) {
+      bloom_filter_.Store(bloom_filter);
+    } else {
+      DCHECK(is_min_max_filter());
+      min_max_filter_.Store(min_max_filter);
+    }
+    arrival_time_.Store(MonotonicMillis());
+    has_filter_.Store(true);
   }
-  arrival_time_.Store(MonotonicMillis());
-  has_filter_.Store(true);
   arrival_cv_.NotifyAll();
 }
 
@@ -64,8 +67,11 @@ void RuntimeFilter::Or(RuntimeFilter* other) {
 }
 
 void RuntimeFilter::Cancel() {
-  if (arrival_time_.Load() != 0) return;
-  arrival_time_.Store(MonotonicMillis());
+  {
+    unique_lock<mutex> l(arrival_mutex_);
+    if (arrival_time_.Load() != 0) return;
+    arrival_time_.Store(MonotonicMillis());
+  }
   arrival_cv_.NotifyAll();
 }
 
@@ -75,6 +81,9 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
     int64_t ms_since_registration = MonotonicMillis() - registration_time_;
     int64_t ms_remaining = timeout_ms - ms_since_registration;
     if (ms_remaining <= 0) break;
+#ifndef NDEBUG
+    if (injection_delay_ > 0) SleepForMs(injection_delay_);
+#endif
     arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI);
   }
   return arrival_time_.Load() != 0;
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5108f83..e2a4a67 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -28,6 +28,7 @@
 namespace impala {
 
 class BloomFilter;
+class RuntimeFilterTest;
 
 /// RuntimeFilters represent set-membership predicates that are computed during query
 /// execution (rather than during planning). They can then be sent to other operators to
@@ -117,6 +118,8 @@ class RuntimeFilter {
   static const char* LLVM_CLASS_NAME;
 
  private:
+  friend class RuntimeFilterTest;
+
   /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
   /// it does not filter any rows, either because it was not created
   /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
@@ -148,5 +151,9 @@ class RuntimeFilter {
   /// Signalled when a filter arrives or the filter is cancelled. Paired with
   /// 'arrival_mutex_'
   mutable ConditionVariable arrival_cv_;
+
+  /// Injection delay for WaitForArrival. Used in testing only.
+  /// See IMPALA-9612.
+  int64_t injection_delay_ = 0;
 };
 }