You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/17 15:35:49 UTC

[impala] branch master updated: IMPALA-6267: MT scanners check filters per split

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f71e97f  IMPALA-6267: MT scanners check filters per split
f71e97f is described below

commit f71e97f0b30acf84fff58024ead4740166295c06
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Mar 11 12:06:34 2020 -0700

    IMPALA-6267: MT scanners check filters per split
    
    Refactor the code for checking each scan range
    against filters so that it can be shared between
    the MT and non-MT scan node implementations.
    Move it into StartNextScanRange(), which has the
    advantage that we can skip issuing then cancelling
    the I/O for the range.
    
    Testing:
    Added a regression test for the code path that failed
    for multithreaded scans before this fix. Looped the
    test for a couple of hours to flush out flakiness.
    
    Fix some runtime filter tests where the mt_dop from
    the dimensions was not applied. Fix
    test_wait_time_cancellation() to work with mt_dop > 0,
    where filters are waited for in Open() instead of GetNext(),
    which means that the query does not get into the RUNNING state
    while waiting for filters. Instead use the profile to detect
    that execution started.
    
    Ran core tests.
    
    Change-Id: Ic40eb4cb2419393e6f7cd7bd019add9224946c4d
    Reviewed-on: http://gerrit.cloudera.org:8080/15411
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc       | 40 ++++++++++++++++++++++++++++----
 be/src/exec/hdfs-scan-node-base.h        | 11 ++++++++-
 be/src/exec/hdfs-scan-node-mt.cc         |  7 +++---
 be/src/exec/hdfs-scan-node.cc            | 20 ++--------------
 tests/query_test/test_runtime_filters.py | 34 +++++++++++++++++++++++++--
 5 files changed, 84 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 10f9266..e515ab2 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -672,12 +672,44 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
   return true;
 }
 
-Status HdfsScanNodeBase::StartNextScanRange(int64_t* reservation,
-    ScanRange** scan_range) {
+void HdfsScanNodeBase::SkipScanRange(io::ScanRange* scan_range) {
+  // Avoid leaking unread buffers in scan_range.
+  scan_range->Cancel(Status::CancelledInternal("HDFS partition pruning"));
+  ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
+  int64_t partition_id = metadata->partition_id;
+  HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
+  DCHECK(partition != nullptr) << "table_id=" << hdfs_table_->id()
+                               << " partition_id=" << partition_id << "\n"
+                               << PrintThrift(runtime_state_->instance_ctx());
+  HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
+  if (metadata->is_sequence_header) {
+    // File ranges haven't been issued yet, skip entire file.
+    UpdateRemainingScanRangeSubmissions(-1);
+    SkipFile(partition->file_format(), desc);
+  } else {
+    // Mark this scan range as done.
+    HdfsScanNodeBase::RangeComplete(
+        partition->file_format(), desc->file_compression, true);
+  }
+}
+
+Status HdfsScanNodeBase::StartNextScanRange(const std::vector<FilterContext>& filter_ctxs,
+    int64_t* reservation, ScanRange** scan_range) {
   DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   bool needs_buffers;
-  RETURN_IF_ERROR(reader_context_->GetNextUnstartedRange(scan_range, &needs_buffers));
-  if (*scan_range == nullptr) return Status::OK();
+  // Loop until we've got a scan range or run out of ranges.
+  do {
+    RETURN_IF_ERROR(reader_context_->GetNextUnstartedRange(scan_range, &needs_buffers));
+    if (*scan_range == nullptr) return Status::OK();
+    if (filter_ctxs.size() > 0) {
+      int64_t partition_id =
+          static_cast<ScanRangeMetadata*>((*scan_range)->meta_data())->partition_id;
+      if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
+        SkipScanRange(*scan_range);
+        *scan_range = nullptr;
+      }
+    }
+  } while (*scan_range == nullptr);
   if (needs_buffers) {
     // Check if we should increase our reservation to read this range more efficiently.
     // E.g. if we are scanning a large text file, we might want extra I/O buffers to
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 0d8c5d2..66a15e9 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -434,6 +434,11 @@ class HdfsScanNodeBase : public ScanNode {
   bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name,
       const std::vector<FilterContext>& filter_ctxs);
 
+  /// Update book-keeping to skip the scan range if it has been issued but will not be
+  /// processed by a scanner. E.g. used to cancel ranges that are filtered out by
+  /// late-arriving filters that could not be applied in IssueInitialScanRanges()
+  void SkipScanRange(io::ScanRange* scan_range);
+
   /// Helper to increase reservation from 'curr_reservation' up to 'ideal_reservation'
   /// that may succeed in getting a partial increase if the full increase is not
   /// possible. First increases to an I/O buffer multiple then increases in I/O buffer
@@ -643,11 +648,15 @@ class HdfsScanNodeBase : public ScanNode {
   /// be increased by this function up to a computed "ideal" reservation, in which case
   /// *reservation is increased to reflect the new reservation.
   ///
+  /// Scan ranges are checked against 'filter_ctxs' and scan ranges belonging to
+  /// partitions that do not pass partition filters are filtered out.
+  ///
   /// Returns Status::OK() and sets 'scan_range' if it gets a range to process. Returns
   /// Status::OK() and sets 'scan_range' to NULL when no more ranges are left to process.
   /// Returns an error status if there was an error getting the range or allocating
   /// buffers.
-  Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
+  Status StartNextScanRange(const std::vector<FilterContext>& filter_ctxs,
+      int64_t* reservation, io::ScanRange** scan_range);
 
   /// Helper for the CreateAndOpenScanner() implementations in the subclass. Creates and
   /// opens a new scanner for this partition type. Depending on the outcome, the
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 2eeb8b8..596328f 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -70,7 +70,7 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
       scanner_.reset();
     }
     int64_t scanner_reservation = buffer_pool_client()->GetReservation();
-    RETURN_IF_ERROR(StartNextScanRange(&scanner_reservation, &scan_range_));
+    RETURN_IF_ERROR(StartNextScanRange(filter_ctxs_, &scanner_reservation, &scan_range_));
     if (scan_range_ == nullptr) {
       *eos = true;
       StopAndFinalizeCounters();
@@ -78,8 +78,9 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
     }
     ScanRangeMetadata* metadata =
         static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
-    int64_t partition_id = metadata->partition_id;
-    HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
+    HdfsPartitionDescriptor* partition =
+        hdfs_table_->GetPartition(metadata->partition_id);
+    DCHECK(partition != nullptr);
     scanner_ctx_.reset(new ScannerContext(runtime_state_, this, buffer_pool_client(),
         scanner_reservation, partition, filter_ctxs(), expr_results_pool()));
     scanner_ctx_->AddStream(scan_range_, scanner_reservation);
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1af0ef4..4e19736 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -400,7 +400,8 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     // StartNextScanRange() and the check for when all ranges are complete.
     int remaining_scan_range_submissions = remaining_scan_range_submissions_.Load();
     ScanRange* scan_range;
-    Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
+    Status status =
+        StartNextScanRange(filter_ctxs, &scanner_thread_reservation, &scan_range);
     if (!status.ok()) {
       unique_lock<timed_mutex> l(lock_);
       // If there was already an error, the main thread will do the cleanup
@@ -471,23 +472,6 @@ void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   DCHECK(partition != nullptr) << "table_id=" << hdfs_table_->id()
                                << " partition_id=" << partition_id
                                << "\n" << PrintThrift(runtime_state_->instance_ctx());
-
-  if (!PartitionPassesFilters(partition_id, FilterStats::SPLITS_KEY, filter_ctxs)) {
-    // Avoid leaking unread buffers in scan_range.
-    scan_range->Cancel(Status::CancelledInternal("HDFS partition pruning"));
-    HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
-    if (metadata->is_sequence_header) {
-      // File ranges haven't been issued yet, skip entire file.
-      UpdateRemainingScanRangeSubmissions(-1);
-      SkipFile(partition->file_format(), desc);
-    } else {
-      // Mark this scan range as done.
-      HdfsScanNodeBase::RangeComplete(partition->file_format(), desc->file_compression,
-          true);
-    }
-    return;
-  }
-
   ScannerContext context(runtime_state_, this, buffer_pool_client(),
       *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
   context.AddStream(scan_range, *scanner_thread_reservation);
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 5e53c6e..0360d00 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -94,12 +94,16 @@ class TestRuntimeFilters(ImpalaTestSuite):
     QUERY = """select straight_join *
                from alltypes t1
                     join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id"""
+    self.client.set_configuration(new_vector.get_value('exec_option'))
     self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT")
     self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", "10000000")
     # Run same query with different delays to better exercise call paths.
     for delay_s in [0, 1, 2]:
       handle = self.client.execute_async(QUERY)
-      self.wait_for_state(handle, QueryState.RUNNING, 10)
+      # Wait until all the fragments have started up.
+      BE_START_REGEX = 'All [0-9]* execution backends .* started'
+      while re.search(BE_START_REGEX, self.client.get_runtime_profile(handle)) is None:
+        time.sleep(0.2)
       time.sleep(delay_s)  # Give the query time to get blocked waiting for the filter.
       self.client.close_query(handle)
 
@@ -124,10 +128,36 @@ class TestRuntimeFilters(ImpalaTestSuite):
     self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
     result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner join
                                 (select * from alltypessmall where smallint_col=-1) v
-                                on v.year = alltypes.year""")
+                                on v.year = alltypes.year""",
+                                new_vector.get_value('exec_option'))
     assert re.search("Files rejected: 8 \(8\)", result.runtime_profile) is not None
     assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) is None
 
+  def test_file_filtering_late_arriving_filter(self, vector):
+    """Test that late-arriving filters are applied to files when the scanner starts processing
+    each scan range."""
+    if 'kudu' in str(vector.get_value('table_format')):
+      return
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.change_database(self.client, vector.get_value('table_format'))
+    self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
+    self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=1")
+    self.execute_query("SET NUM_SCANNER_THREADS=1")
+    # This query is crafted so that both scans execute slowly, but the filter should
+    # arrive before the destination scan finishes processing all of its files (there are 8
+    # files per executor). When I tested this, the filter reliably arrived after a single
+    # input file was processed, but the test will still pass as long as it arrives before
+    # the last file starts being processed.
+    result = self.execute_query("""select STRAIGHT_JOIN count(*) from alltypes inner join /*+shuffle*/
+                                     (select distinct * from alltypessmall
+                                      where smallint_col > sleep(100)) v
+                                     on v.id = alltypes.id
+                                   where alltypes.id < sleep(10);""",
+                                   new_vector.get_value('exec_option'))
+    assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) is not None
+
+
 @SkipIfLocal.multiple_impalad
 class TestBloomFilters(ImpalaTestSuite):
   @classmethod