You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/10/25 23:10:13 UTC

[impala] 02/03: IMPALA-9065: don't block indefinitely for filters

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9100a98273aa840dc6781c446757b97db50c8b47
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Oct 17 13:25:53 2019 -0700

    IMPALA-9065: don't block indefinitely for filters
    
    This patch ensures that query cancellation will
    promptly wake up any threads blocked waiting for runtime
    filters to arrive. Before this patch, threads would
    wait for up to RUNTIME_FILTER_WAIT_TIME_MS after the
    query was cancelled.
    
    Testing:
    * Add a cancellation test with a high runtime filter wait time
      that reproduces the threads getting stuck. This test
      failed reliably without the code changes.
    * Also update metric verification to check that no fragments
      are left running when tests are finished.
    * Ran exhaustive tests.
    * Ran a 10000 query TPC-H Kudu and TPC-DS Parquet stress test on
      a minicluster with 3 impalads.
    
    Change-Id: I0a70e4451c2b48c97f854246e90b71f6e5d67710
    Reviewed-on: http://gerrit.cloudera.org:8080/14499
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.h     |  7 +++---
 be/src/runtime/fragment-instance-state.cc |  8 ++++++-
 be/src/runtime/runtime-filter-bank.cc     | 17 ++++++++++++++
 be/src/runtime/runtime-filter-bank.h      | 10 +++++++++
 be/src/runtime/runtime-filter.cc          | 37 ++++++++++++++++++++++++-------
 be/src/runtime/runtime-filter.h           | 27 +++++++++++++++++-----
 be/src/runtime/runtime-filter.inline.h    | 12 ----------
 be/src/runtime/runtime-state.cc           |  5 +++++
 be/src/runtime/runtime-state.h            | 12 ++++++----
 tests/query_test/test_runtime_filters.py  | 37 +++++++++++++++++++++++++++++++
 tests/verifiers/metric_verifier.py        |  5 +----
 11 files changed, 140 insertions(+), 37 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index 2c46e4f..c908cda 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -143,9 +143,9 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// Returns true if the 'queue' (not the 'batch_queue_') is empty. 'queue' refers to
   /// the logical queue of RowBatches and thus includes any RowBatch that
   /// 'current_batch_' points to. Must be called while holding 'lock_'. Cannot be called
-  /// once the query has been cancelled or closed.
+  /// once the sink has been closed.
   bool IsQueueEmpty(RuntimeState* state) const {
-    DCHECK(!IsCancelledOrClosed(state));
+    DCHECK(!closed_);
     return batch_queue_->IsEmpty() && current_batch_row_ == 0;
   }
 
@@ -157,7 +157,8 @@ class BufferedPlanRootSink : public PlanRootSink {
   }
 
   /// Returns true if the query has been cancelled or if the PlanRootSink has been
-  /// closed, returns false otherwise.
+  /// closed, returns false otherwise. Cancellation can occur asynchronously, so this
+  /// may become true at any point.
   bool IsCancelledOrClosed(RuntimeState* state) const {
     return state->is_cancelled() || closed_;
   }
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 5d5e3c4..a57f69c 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -122,7 +122,7 @@ done:
 
 void FragmentInstanceState::Cancel() {
   DCHECK(runtime_state_ != nullptr);
-  runtime_state_->set_is_cancelled();
+  runtime_state_->Cancel();
   if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
   ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
@@ -384,6 +384,12 @@ Status FragmentInstanceState::ExecInternal() {
 void FragmentInstanceState::Close() {
   DCHECK(runtime_state_ != nullptr);
 
+  // Required to wake up any threads that might be blocked waiting for filters, e.g.
+  // scanner threads.
+  // TODO: we might be able to remove this with mt_dop, since we only need to worry
+  // have the fragment thread (i.e. the current thread).
+  Cancel();
+
   // If we haven't already released this thread token in Prepare(), release
   // it before calling Close().
   if (fragment_ctx_.fragment.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) {
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 11026cb..56f63aa 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -85,6 +85,10 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
   } else {
     if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
       ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
+      // The filter bank may have already been cancelled. In that case, still allocate the
+      // filter but cancel it immediately, so that callers of RuntimeFilterBank don't need
+      // to have separate handling of that case.
+      if (cancelled_) ret->Cancel();
       consumed_filters_[filter_desc.filter_id] = ret;
       VLOG(2) << "registered consumer filter " << filter_desc.filter_id;
     } else {
@@ -261,8 +265,21 @@ bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv)
   return fpp > FLAGS_max_filter_error_rate;
 }
 
+void RuntimeFilterBank::Cancel() {
+  lock_guard<mutex> l(runtime_filter_lock_);
+  CancelLocked();
+}
+
+void RuntimeFilterBank::CancelLocked() {
+  if (cancelled_) return;
+  // Cancel all filters that a thread might be waiting on.
+  for (auto& entry : consumed_filters_) entry.second->Cancel();
+  cancelled_ = true;
+}
+
 void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
+  CancelLocked();
   closed_ = true;
   for (BloomFilter* filter : bloom_filters_) filter->Close();
   for (MinMaxFilter* filter : min_max_filters_) filter->Close();
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 907d6c4..1208f03 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -124,6 +124,10 @@ class RuntimeFilterBank {
   /// Default hash seed to use when computing hashed values to insert into filters.
   static int32_t IR_ALWAYS_INLINE DefaultHashSeed() { return 1234; }
 
+  /// Called to signal that the query is being cancelled. Wakes up any threads blocked
+  /// waiting for filters to allow them to finish.
+  void Cancel();
+
   /// Releases all memory allocated for BloomFilters.
   void Close();
 
@@ -131,6 +135,9 @@ class RuntimeFilterBank {
   static const int64_t MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;  // 512MB
 
  private:
+  /// Implementation of Cancel(). 'runtime_filter_lock_' must be held by caller.
+  void CancelLocked();
+
   /// Lock protecting produced_filters_ and consumed_filters_.
   boost::mutex runtime_filter_lock_;
 
@@ -152,6 +159,9 @@ class RuntimeFilterBank {
   /// MemTracker to track Bloom filter memory.
   boost::scoped_ptr<MemTracker> filter_mem_tracker_;
 
+  /// True iff Cancel() or Close() has been called. Protected by 'runtime_filter_lock_'.
+  bool cancelled_ = false;
+
   /// True iff Close() has been called. Used to prevent races between
   /// AllocateScratchBloomFilter() and Close().
   bool closed_;
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index a2fd30e..813d348 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -23,15 +23,36 @@
 
 using namespace impala;
 
-const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
-
 const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
 
-bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
-  do {
-    if (HasFilter()) return true;
-    SleepForMs(SLEEP_PERIOD_MS);
-  } while ((MonotonicMillis() - registration_time_) < timeout_ms);
+void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
+  DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
+  DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+  if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
+  if (is_bloom_filter()) {
+    bloom_filter_.Store(bloom_filter);
+  } else {
+    DCHECK(is_min_max_filter());
+    min_max_filter_.Store(min_max_filter);
+  }
+  arrival_time_.Store(MonotonicMillis());
+  has_filter_.Store(true);
+  arrival_cv_.NotifyAll();
+}
 
-  return HasFilter();
+void RuntimeFilter::Cancel() {
+  if (arrival_time_.Load() != 0) return;
+  arrival_time_.Store(MonotonicMillis());
+  arrival_cv_.NotifyAll();
+}
+
+bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
+  unique_lock<mutex> l(arrival_mutex_);
+  while (arrival_time_.Load() == 0) {
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    int64_t ms_remaining = timeout_ms - ms_since_registration;
+    if (ms_remaining <= 0) break;
+    arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI);
+  }
+  return arrival_time_.Load() != 0;
 }
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 97a2842..5b65f7a 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -19,10 +19,12 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H
 #define IMPALA_RUNTIME_RUNTIME_FILTER_H
 
+#include <boost/thread/mutex.hpp>
+
 #include "runtime/raw-value.h"
 #include "runtime/runtime-filter-bank.h"
 #include "util/bloom-filter.h"
-#include "util/spinlock.h"
+#include "util/condition-variable.h"
 #include "util/time.h"
 
 namespace impala {
@@ -50,7 +52,7 @@ class RuntimeFilter {
   }
 
   /// Returns true if SetFilter() has been called.
-  bool HasFilter() const { return arrival_time_.Load() != 0; }
+  bool HasFilter() const { return has_filter_.Load(); }
 
   const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
   int32_t id() const { return filter_desc().filter_id; }
@@ -67,7 +69,11 @@ class RuntimeFilter {
 
   /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
-  inline void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
+  void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
+
+  /// Signal that no filter should be arriving, waking up any threads blocked in
+  /// WaitForArrival().
+  void Cancel();
 
   /// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is
   /// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently
@@ -113,14 +119,25 @@ class RuntimeFilter {
   /// Reference to the filter's thrift descriptor in the thrift Plan tree.
   const TRuntimeFilterDesc& filter_desc_;
 
-  /// Time, in ms, that the filter was registered.
+  /// Time in ms (from MonotonicMillis()), that the filter was registered.
   const int64_t registration_time_;
 
-  /// Time, in ms, that the global filter arrived. Set in SetFilter().
+  /// Time, in ms (from MonotonicMillis()), that the global filter arrived, or the
+  /// filter was cancelled. Set in SetFilter() or Cancel().
   AtomicInt64 arrival_time_;
 
+  /// Only set after arrival_time_, if SetFilter() was called.
+  AtomicBool has_filter_{false};
+
   /// The size of the Bloom filter, in bytes.
   const int64_t filter_size_;
+
+  /// Lock to protect 'arrival_cv_'
+  mutable boost::mutex arrival_mutex_;
+
+  /// Signalled when a filter arrives or the filter is cancelled. Paired with
+  /// 'arrival_mutex_'
+  mutable ConditionVariable arrival_cv_;
 };
 
 }
diff --git a/be/src/runtime/runtime-filter.inline.h b/be/src/runtime/runtime-filter.inline.h
index b2de81d..fee6c17 100644
--- a/be/src/runtime/runtime-filter.inline.h
+++ b/be/src/runtime/runtime-filter.inline.h
@@ -37,18 +37,6 @@ inline const RuntimeFilter* RuntimeFilterBank::GetRuntimeFilter(int32_t filter_i
   return it->second;
 }
 
-inline void RuntimeFilter::SetFilter(
-    BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
-  DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
-  if (is_bloom_filter()) {
-    bloom_filter_.Store(bloom_filter);
-  } else {
-    DCHECK(is_min_max_filter());
-    min_max_filter_.Store(min_max_filter);
-  }
-  arrival_time_.Store(MonotonicMillis());
-}
-
 inline bool RuntimeFilter::AlwaysTrue() const {
   if (is_bloom_filter()) {
     return HasFilter() && bloom_filter_.Load() == BloomFilter::ALWAYS_TRUE_FILTER;
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index b16fb58..6d1ab84 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -236,6 +236,11 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
   return Status::OK();
 }
 
+void RuntimeState::Cancel() {
+  is_cancelled_.Store(true);
+  if (filter_bank_ != nullptr) filter_bank_->Cancel();
+}
+
 double RuntimeState::ComputeExchangeScanRatio() const {
   int64_t bytes_read = 0;
   for (const auto& c : bytes_read_counters_) bytes_read += c->value();
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 3c8c321..16c6df4 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -26,6 +26,7 @@
 
 // NOTE: try not to add more headers here: runtime-state.h is included in many many files.
 #include "common/global-types.h"  // for PlanNodeId
+#include "common/atomic.h"
 #include "runtime/client-cache-types.h"
 #include "runtime/dml-exec-state.h"
 #include "util/error-util-internal.h"
@@ -230,8 +231,8 @@ class RuntimeState {
   /// true or the error is not recoverable and should be handled upstream.
   Status LogOrReturnError(const ErrorMsg& message);
 
-  bool is_cancelled() const { return is_cancelled_; }
-  void set_is_cancelled() { is_cancelled_ = true; }
+  bool is_cancelled() const { return is_cancelled_.Load(); }
+  void Cancel();
 
   RuntimeProfile::Counter* total_storage_wait_timer() {
     return total_storage_wait_timer_;
@@ -385,8 +386,11 @@ class RuntimeState {
   /// execution. Owned by 'query_state_'.
   ReservationTracker* const instance_buffer_reservation_;
 
-  /// if true, execution should stop with a CANCELLED status
-  bool is_cancelled_ = false;
+  /// If true, execution should stop, either because the query was cancelled by the
+  /// client, or because execution of the fragment instance is finished. If the main
+  /// fragment instance thread is still running, it should terminate with a CANCELLED
+  /// status once it notices is_cancelled_ == true.
+  AtomicBool is_cancelled_{false};
 
   /// if true, ReleaseResources() was called.
   bool released_resources_ = false;
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index b1d0756..6c7c41f 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -20,9 +20,12 @@ import pytest
 import re
 import time
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.environ import build_flavor_timeout
+from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal, SkipIfIsilon
+from tests.verifiers.metric_verifier import MetricVerifier
 
 # slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the
 # runtime filters don't arrive in time.
@@ -61,6 +64,40 @@ class TestRuntimeFilters(ImpalaTestSuite):
         "Query took too long (%ss, possibly waiting for missing filters?)" \
         % str(duration_s)
 
+  @pytest.mark.execute_serially
+  def test_wait_time_cancellation(self, vector):
+    """Regression test for IMPALA-9065 to ensure that threads waiting for filters
+    get woken up and exit promptly when the query is cancelled."""
+    # Make sure the cluster is quiesced before we start this test
+    self._verify_no_fragments_running()
+
+    self.change_database(self.client, vector.get_value('table_format'))
+    # Set up a query where a scan (plan node 0, scanning alltypes) will wait
+    # indefinitely for a filter to arrive. The filter arrival is delayed
+    # by adding a wait to the scan of alltypestime (plan node 0).
+    QUERY = """select straight_join *
+               from alltypes t1
+                    join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id"""
+    self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT")
+    self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", "10000000")
+    # Run same query with different delays to better exercise call paths.
+    for delay_s in [0, 1, 2]:
+      handle = self.client.execute_async(QUERY)
+      self.wait_for_state(handle, QueryState.RUNNING, 10)
+      time.sleep(delay_s)  # Give the query time to get blocked waiting for the filter.
+      self.client.close_query(handle)
+
+      # Ensure that cancellation has succeeded and the cluster has quiesced.
+      self._verify_no_fragments_running()
+
+  def _verify_no_fragments_running(self):
+    """Raise an exception if there are fragments running on the cluster after a
+    timeout."""
+    for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=10)
+      verifier.wait_for_backend_admission_control_state(timeout=10)
+
   def test_file_filtering(self, vector):
     if 'kudu' in str(vector.get_value('table_format')):
       return
diff --git a/tests/verifiers/metric_verifier.py b/tests/verifiers/metric_verifier.py
index aff35b3..df51822 100644
--- a/tests/verifiers/metric_verifier.py
+++ b/tests/verifiers/metric_verifier.py
@@ -26,10 +26,7 @@ LOG.setLevel(level=logging.DEBUG)
 # List of metrics that should be equal to zero when there are no outstanding queries.
 METRIC_LIST = [
                "impala-server.num-queries-registered",
-               # TODO (IMPALA-3377): Re-enable
-               # "impala-server.backends.client-cache.clients-in-use", disabled as a
-               # work-around due to IMPALA-3327.
-               #"impala-server.backends.client-cache.clients-in-use",
+               "impala-server.num-fragments-in-flight",
                "impala-server.io-mgr.num-open-files",
                "impala-server.num-files-open-for-insert",
                "impala-server.scan-ranges.num-missing-volume-id",