You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2019/02/01 19:15:29 UTC

[impala] 05/05: IMPALA-7980: Fix spinning because of buggy num_unqueued_files_.

This is an automated email from the ASF dual-hosted git repository.

philz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a8e30506aafef14646d95a56fb87cf7c28d259d6
Author: Philip Zeyliger <ph...@cloudera.com>
AuthorDate: Fri Dec 14 09:28:26 2018 -0800

    IMPALA-7980: Fix spinning because of buggy num_unqueued_files_.
    
    This commit removes num_unqueued_files_ and replaces it with a more
    tightly scoped and easier to reason about
    remaining_scan_range_submissions_ variable. This variable (and its
    predecessor) are used as a way to signal to scanner threads they may
    exit (instead of spinning) because there will never be a scan range
    provided to them, because no more scan ranges will be added. In
    practice, most scanner implementations can never call AddDiskIoRanges()
    after IssueInitialRanges(). The exception is sequence files and Avro,
    which share a common base class. Instead of incrementing and
    decrementing this counter in a variety of paths, this commit makes the
    common case simple (set to 1 initially; decrement at exit points of
    IssueInitialRanges()) and the complicated, sequence-file case is treated
    within base-sequence-scanner.cc.
    
    Note that this is not the first instance of a subtle bug
    in this code. The following two JIRAs (and corresponding
    commits) are fundamentally similar bugs:
        IMPALA-3798: Disable per-split filtering for sequence-based scanners
        IMPALA-1730: reduce scanner thread spinning windows
    
    We ran into this bug when running TPC-DS query 1 on scale factor 10,000
    (10TB) on a 140-node cluster with replica_preference=remote, we observed
    really high system CPU usage for some of the scan nodes:
    
      HDFS_SCAN_NODE (id=6):(Total: 59s107ms, non-child: 59s107ms, % non- child: 100.00%
        - BytesRead: 80.50 MB (84408563)
        - ScannerThreadsSysTime: 36m17s
    
    Using 36 minutes of system time in only 1 minute of wall-clock time
    required ~30 threads to be spinning in the kernel. We were able to use
    perf to find a lot of usage of futex_wait() and pthread_cond_wait().
    Eventually, we figured out that ScannerThreads, once started, loop
    forever looking for work.  The case that there is no work is supposed to
    be rare, and the scanner threads are supposed to exit based on
    num_unqueued_files_ being 0, but, in some cases, that counter isn't
    appropriately decremented.
    
    The reproduction is any query that uses runtime filters to filter out
    entire files. Something like:
    
      set RUNTIME_FILTER_WAIT_TIME_MS=10000;
      select count(*)
      from customer
      join customer_address on c_current_addr_sk = ca_address_sk
      where ca_street_name="DoesNotExist" and c_last_name="DoesNotExist";
    
    triggers this behavior. This code path is covered by several existing
    tests, most directly in test_runtime_filters.py:test_file_filtering().
    Interestingly, though this wastes cycles, query results are unaffected.
    
    I initially fixed this bug with a point fix that handled the case when
    runtime filters caused files to be skipped and added assertions that
    checked that num_unqueued_files_ was decremented to zero when queries
    finished. Doing this led me, somewhat slowly, to both finding similar
    bugs in other parts of the code (HdfsTextScanner::IssueInitialRanges had
    the same bug if the entire file was skipped) and fighting with races on
    the assertion itself. I eventually concluded that there's really no
    shared synchronization between progress_.Done() and num_unqueued_files_.
    The same conclusion is true for the current implementation, so there
    aren't assertions.
    
    I added a metric for how many times the scanners run through their
    loop without doing any work and observed it to be non-zero
    for a query from tests/query_test/test_runtime_filters.py:test_wait_time.
    
    To measure the effect of this, I set up a cluster of 9 impalad's and
    1 coordinator, running against an entirely remote HDFS. The machines
    were r4.4xlarge and the remote disks were EBS st1's, though everything
    was likely buffer cached. I ran
    TPCDS-Q1 with RUNTIME_FILTER_WAIT_TIME_MS=2000 against
    tpcds_1000_decimal_parquet 10 times. The big observable
    thing is that ScannerThreadsSysTime went from 5.6 seconds per
    query to 1.9 seconds per query. (I ran the text profiles through the old-fashioned:
      grep ScannerThreadsSysTime profiles | awk '/ms/ { x += $3/1000 } /ns/ { x += $3/1000000 } END { print x }'
    )
    The query time effect was quite small (the fastest query was 3.373s
    with the change and 3.82s without the change, but the averages were
    tighter), but the extra work was visible in the profiles.
    
    I happened to rename HdfsScanNode::file_type_counts_ to
    HdfsScanNode::file_type_counts_lock_ because
    HdfsScanNodeBase::file_type_counts_ also exists, and
    is totally different.
    
    This bug was co-debugged by Todd Lipcon, Joe McDonnell, and Philip
    Zeyliger.
    
    Change-Id: I133de13238d3d05c510e2ff771d48979125735b1
    Reviewed-on: http://gerrit.cloudera.org:8080/12097
    Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
    Tested-by: Philip Zeyliger <ph...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.cc     | 15 ++++++++------
 be/src/exec/hdfs-scan-node-base.cc       | 19 +++++++++++-------
 be/src/exec/hdfs-scan-node-base.h        | 33 ++++++++++++++++++++-----------
 be/src/exec/hdfs-scan-node.cc            | 34 ++++++++++++++++++++++----------
 be/src/exec/hdfs-scan-node.h             | 13 +++++++-----
 be/src/exec/hdfs-scanner.cc              |  6 ++----
 be/src/exec/hdfs-text-scanner.cc         |  6 ++----
 tests/query_test/test_runtime_filters.py |  6 ++++--
 8 files changed, 82 insertions(+), 50 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index b63a254..4e2dd36 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -67,9 +67,10 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }
-  // Issue the header ranges only. GetNextInternal() will issue the files' scan ranges
-  // and those ranges will need scanner threads, so no files are marked completed yet.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(header_ranges, 0, EnqueueLocation::TAIL));
+  // When the header is parsed, we will issue more AddDiskIoRanges in
+  // the scanner threads.
+  scan_node->UpdateRemainingScanRangeSubmissions(header_ranges.size());
+  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(header_ranges, EnqueueLocation::TAIL));
   return Status::OK();
 }
 
@@ -157,6 +158,7 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
     header_ = state_->obj_pool()->Add(AllocateFileHeader());
     Status status = ReadFileHeader();
     if (!status.ok()) {
+      scan_node_->UpdateRemainingScanRangeSubmissions(-1);
       RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
       // We need to complete the ranges for this file.
       CloseFileRanges(stream_->filename());
@@ -168,8 +170,9 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
     HdfsFileDesc* desc = scan_node_->GetFileDesc(
         context_->partition_descriptor()->id(), stream_->filename());
     // Issue the scan range with priority since it would result in producing a RowBatch.
-    RETURN_IF_ERROR(scan_node_->AddDiskIoRanges(desc, EnqueueLocation::HEAD));
-    return Status::OK();
+    status = scan_node_->AddDiskIoRanges(desc, EnqueueLocation::HEAD);
+    scan_node_->UpdateRemainingScanRangeSubmissions(-1);
+    return status;
   }
   if (eos_) return Status::OK();
 
@@ -328,8 +331,8 @@ void BaseSequenceScanner::CloseFileRanges(const char* filename) {
   const vector<ScanRange*>& splits = desc->splits;
   for (int i = 0; i < splits.size(); ++i) {
     COUNTER_ADD(bytes_skipped_counter_, splits[i]->len());
-    scan_node_->RangeComplete(file_format(), THdfsCompression::NONE);
   }
+  scan_node_->SkipFile(file_format(), desc);
 }
 
 int BaseSequenceScanner::ReadPastSize(int64_t file_offset) {
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 69e46e5..94782b3 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -46,6 +46,7 @@
 #include "util/disk-info.h"
 #include "util/hdfs-util.h"
 #include "util/periodic-counter-updater.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
@@ -221,7 +222,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       file_desc->file_compression = split.file_compression;
       RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
           native_file_path, &file_desc->fs, &fs_cache));
-      num_unqueued_files_.Add(1);
       per_type_files_[partition_desc->file_format()].push_back(file_desc);
     } else {
       // File already processed
@@ -447,6 +447,9 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
 Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
   DCHECK(!initial_ranges_issued_);
   initial_ranges_issued_ = true;
+  // We want to decrement this remaining_scan_range_submissions in all cases.
+  auto remaining_scan_range_submissions_trigger =
+    MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); });
 
   // No need to issue ranges with limit 0.
   if (ReachedLimit()) {
@@ -462,6 +465,8 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
     for (HdfsFileDesc* file: v.second) {
       if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) {
         matching_files->push_back(file);
+      } else {
+        SkipFile(v.first, file);
       }
     }
   }
@@ -490,6 +495,9 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
         DCHECK(false) << "Unexpected file type " << entry.first;
     }
   }
+  // Except for BaseSequenceScanner, IssueInitialRanges() takes care of
+  // issuing all the ranges. For BaseSequenceScanner, IssueInitialRanges()
+  // will have incremented the counter.
   return Status::OK();
 }
 
@@ -504,7 +512,6 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates(
       static_cast<ScanRangeMetadata*>(file->splits[0]->meta_data());
   if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY,
           filter_ctxs)) {
-    SkipFile(format, file);
     return false;
   }
   return true;
@@ -592,13 +599,11 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 }
 
 Status HdfsScanNodeBase::AddDiskIoRanges(const vector<ScanRange*>& ranges,
-    int num_files_queued, EnqueueLocation enqueue_location) {
+    EnqueueLocation enqueue_location) {
   DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(remaining_scan_range_submissions_.Load(), 0);
   DCHECK_GT(ranges.size(), 0);
-  RETURN_IF_ERROR(reader_context_->AddScanRanges(ranges, enqueue_location));
-  num_unqueued_files_.Add(-num_files_queued);
-  DCHECK_GE(num_unqueued_files_.Load(), 0);
-  return Status::OK();
+  return reader_context_->AddScanRanges(ranges, enqueue_location);
 }
 
 HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(int64_t partition_id, const string& filename) {
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 0fa24d0..5de1768 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -263,23 +263,26 @@ class HdfsScanNodeBase : public ScanNode {
       int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
       bool expected_local, int mtime, const io::ScanRange* original_split = NULL);
 
-  /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan
-  /// ranges have been added completely.  A file's scan ranges are added completely if no
-  /// new scanner threads will be needed to process that file besides the additional
-  /// threads needed to process those in 'ranges'.
-  /// Can be overridden to add scan-node specific actions like starting scanner threads.
+  /// Adds ranges to the io mgr queue. Can be overridden to add scan-node specific
+  /// actions like starting scanner threads. Must not be called once
+  /// remaining_scan_range_submissions_ is 0.
   /// The enqueue_location specifies whether the scan ranges are added to the head or
   /// tail of the queue.
   virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
-      int num_files_queued,
       EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT;
 
-  /// Adds all splits for file_desc to the io mgr queue and indicates one file has
-  /// been added completely. If the enqueue_location is set to HEAD, the scan ranges that
-  /// belong to this file are processed ahead of other scan ranges currently queued.
+  /// Adds all splits for file_desc to the io mgr queue.
   inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc,
       EnqueueLocation enqueue_location = EnqueueLocation::TAIL) WARN_UNUSED_RESULT {
-    return AddDiskIoRanges(file_desc->splits, 1, enqueue_location);
+    return AddDiskIoRanges(file_desc->splits);
+  }
+
+  /// When this counter goes to 0, AddDiskIoRanges() can no longer be called.
+  /// Furthermore, this implies that scanner threads failing to
+  /// acquire a new scan range with StartNextScanRange() can exit.
+  inline void UpdateRemainingScanRangeSubmissions(int32_t delta) {
+    remaining_scan_range_submissions_.Add(delta);
+    DCHECK_GE(remaining_scan_range_submissions_.Load(), 0);
   }
 
   /// Allocates and initializes a new template tuple allocated from pool with values
@@ -459,8 +462,14 @@ class HdfsScanNodeBase : public ScanNode {
   /// this variable.
   bool initial_ranges_issued_ = false;
 
-  /// Number of files that have not been issued from the scanners.
-  AtomicInt32 num_unqueued_files_;
+  /// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
+  /// therefore scanner threads that can't get work should exit. For most
+  /// file formats (except for sequence-based formats), this is 0 after
+  /// IssueInitialRanges(). Note that some scanners (namely Parquet) issue
+  /// additional work to the IO subsystem without using AddDiskIoRanges(),
+  /// but that is managed within the scanner, and doesn't require
+  /// additional scanner threads.
+  AtomicInt32 remaining_scan_range_submissions_ = { 1 };
 
   /// Per scanner type codegen'd fn.
   typedef boost::unordered_map<THdfsFileFormat::type, void*> CodegendFnMap;
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 04fabe8..bcf0d71 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -76,6 +76,7 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
 HdfsScanNode::~HdfsScanNode() {
 }
 
+
 Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   ScopedGetNextEventAdder ea(this, eos);
@@ -108,7 +109,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   Status status = GetNextInternal(state, row_batch, eos);
   if (!status.ok() || *eos) {
     unique_lock<mutex> l(lock_);
-    lock_guard<SpinLock> l2(file_type_counts_);
+    lock_guard<SpinLock> l2(file_type_counts_lock_);
     StopAndFinalizeCounters();
   }
   return status;
@@ -162,6 +163,8 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
   thread_state_.Prepare(this, EstimateScannerThreadMemConsumption());
   scanner_thread_reservations_denied_counter_ =
       ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", TUnit::UNIT);
+  scanner_thread_workless_loops_counter_ =
+      ADD_COUNTER(runtime_profile(), "ScannerThreadWorklessLoops", TUnit::UNIT);
   return Status::OK();
 }
 
@@ -186,12 +189,21 @@ void HdfsScanNode::Close(RuntimeState* state) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
   thread_state_.Close(this);
+#ifndef NDEBUG
+  // At this point, the other threads have been joined, and
+  // remaining_scan_range_submissions_ should be 0, if the
+  // query started and wasn't cancelled or exited early.
+  if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_
+      && progress_.done()) {
+    DCHECK_EQ(remaining_scan_range_submissions_.Load(), 0);
+  }
+#endif
   HdfsScanNodeBase::Close(state);
 }
 
 void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type,
     const std::vector<THdfsCompression::type>& compression_type, bool skipped) {
-  lock_guard<SpinLock> l(file_type_counts_);
+  lock_guard<SpinLock> l(file_type_counts_lock_);
   HdfsScanNodeBase::RangeComplete(file_type, compression_type, skipped);
 }
 
@@ -206,10 +218,9 @@ void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) {
 }
 
 Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
-    int num_files_queued, EnqueueLocation enqueue_location) {
+    EnqueueLocation enqueue_location) {
+  DCHECK_GT(remaining_scan_range_submissions_.Load(), 0);
   RETURN_IF_ERROR(reader_context_->AddScanRanges(ranges, enqueue_location));
-  num_unqueued_files_.Add(-num_files_queued);
-  DCHECK_GE(num_unqueued_files_.Load(), 0);
   if (!ranges.empty()) ThreadTokenAvailableCb(runtime_state_->resource_pool());
   return Status::OK();
 }
@@ -383,10 +394,10 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     // to return if there's an error.
     ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
 
-    // Take a snapshot of num_unqueued_files_ before calling StartNextScanRange().
-    // We don't want num_unqueued_files_ to go to zero between the return from
+    // Take a snapshot of remaining_scan_range_submissions before calling
+    // StartNextScanRange().  We don't want it to go to zero between the return from
     // StartNextScanRange() and the check for when all ranges are complete.
-    int num_unqueued_files = num_unqueued_files_.Load();
+    int remaining_scan_range_submissions = remaining_scan_range_submissions_.Load();
     ScanRange* scan_range;
     Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
     if (!status.ok()) {
@@ -412,7 +423,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       break;
     }
 
-    if (scan_range == nullptr && num_unqueued_files == 0) {
+    if (scan_range == nullptr && remaining_scan_range_submissions == 0) {
       unique_lock<mutex> l(lock_);
       // All ranges have been queued and DiskIoMgr has no more new ranges for this scan
       // node to process. This means that every range is either done or being processed by
@@ -429,6 +440,8 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       VLOG_QUERY << "Soft memory limit exceeded. Extra scanner thread exiting.";
       break;
     }
+
+    if (scan_range == nullptr) COUNTER_ADD(scanner_thread_workless_loops_counter_, 1);
   }
 
   {
@@ -463,7 +476,8 @@ void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     scan_range->Cancel(Status::CancelledInternal("HDFS partition pruning"));
     HdfsFileDesc* desc = GetFileDesc(partition_id, *scan_range->file_string());
     if (metadata->is_sequence_header) {
-      // File ranges haven't been issued yet, skip entire file
+      // File ranges haven't been issued yet, skip entire file.
+      UpdateRemainingScanRangeSubmissions(-1);
       SkipFile(partition->file_format(), desc);
     } else {
       // Mark this scan range as done.
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 71500a4..c395b6c 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -83,12 +83,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   bool done() const { return done_; }
 
-  /// Adds ranges to the io mgr queue and starts up new scanner threads if possible. The
-  /// enqueue_location parameter determines the location at which the scan ranges are
+  /// Adds ranges to the io mgr queue and starts up new scanner threads if possible.
+  /// The enqueue_location parameter determines the location at which the scan ranges are
   /// added to the queue.
   virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges,
-      int num_files_queued, EnqueueLocation enqueue_location =
-                                EnqueueLocation::TAIL) override WARN_UNUSED_RESULT;
+      EnqueueLocation enqueue_location = EnqueueLocation::TAIL)
+      override WARN_UNUSED_RESULT;
 
   /// Adds a materialized row batch for the scan node.  This is called from scanner
   /// threads. This function will block if the row batch queue is full.
@@ -119,7 +119,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   /// Protects file_type_counts_. Cannot be taken together with any other lock
   /// except lock_, and if so, lock_ must be taken first.
-  SpinLock file_type_counts_;
+  SpinLock file_type_counts_lock_;
 
   /// Flag signaling that all scanner threads are done.  This could be because they
   /// are finished, an error/cancellation occurred, or the limit was reached.
@@ -140,6 +140,9 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// being denied.
   RuntimeProfile::Counter* scanner_thread_reservations_denied_counter_ = nullptr;
 
+  /// Number of times scanner thread didn't find work to do.
+  RuntimeProfile::Counter* scanner_thread_workless_loops_counter_ = nullptr;
+
   /// Compute the estimated memory consumption of a scanner thread in bytes for the
   /// purposes of deciding whether to start a new scanner thread.
   int64_t EstimateScannerThreadMemConsumption() const;
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index baab75d..639bac3 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -828,11 +828,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
       }
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all the files
-  // as complete here.
+  // The threads that process the footer will also do the scan.
   if (footer_ranges.size() > 0) {
-    RETURN_IF_ERROR(
-        scan_node->AddDiskIoRanges(footer_ranges, files.size(), EnqueueLocation::TAIL));
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, EnqueueLocation::TAIL));
   }
   return Status::OK();
 }
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index c8009f3..af93974 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -76,7 +76,6 @@ HdfsTextScanner::~HdfsTextScanner() {
 Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   vector<ScanRange*> compressed_text_scan_ranges;
-  int compressed_text_files = 0;
   map<string, vector<HdfsFileDesc*>> plugin_text_files;
   for (int i = 0; i < files.size(); ++i) {
     THdfsCompression::type compression = files[i]->file_compression;
@@ -91,7 +90,6 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       case THdfsCompression::SNAPPY:
       case THdfsCompression::SNAPPY_BLOCKED:
       case THdfsCompression::BZIP2:
-        ++compressed_text_files;
         for (int j = 0; j < files[i]->splits.size(); ++j) {
           // In order to decompress gzip-, snappy- and bzip2-compressed text files, we
           // need to read entire files. Only read a file if we're assigned the first split
@@ -145,8 +143,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     }
   }
   if (compressed_text_scan_ranges.size() > 0) {
-    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(
-        compressed_text_scan_ranges, compressed_text_files, EnqueueLocation::TAIL));
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+          EnqueueLocation::TAIL));
   }
   for (const auto& entry : plugin_text_files) {
     DCHECK_GT(entry.second.size(), 0) << "List should be non-empty";
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index daf7321..320891e 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -56,7 +56,8 @@ class TestRuntimeFilters(ImpalaTestSuite):
     self.run_test_case('QueryTest/runtime_filters_wait', vector)
     duration_s = time.time() - now
     assert duration_s < (WAIT_TIME_MS / 1000), \
-        "Query took too long (%ss, possibly waiting for missing filters?)" % str(duration)
+        "Query took too long (%ss, possibly waiting for missing filters?)" \
+        % str(duration_s)
 
   def test_file_filtering(self, vector):
     if 'kudu' in str(vector.get_value('table_format')):
@@ -93,7 +94,8 @@ class TestBloomFilters(ImpalaTestSuite):
     self.run_test_case('QueryTest/bloom_filters_wait', vector)
     duration_s = time.time() - now
     assert duration_s < (WAIT_TIME_MS / 1000), \
-        "Query took too long (%ss, possibly waiting for missing filters?)" % str(duration)
+        "Query took too long (%ss, possibly waiting for missing filters?)" \
+        % str(duration_s)
 
 
 @SkipIfLocal.multiple_impalad