You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/02/28 08:21:09 UTC

[3/3] impala git commit: IMPALA-6588: don't add empty list of ranges in text scan

IMPALA-6588: don't add empty list of ranges in text scan

DiskIoMgr::AddScanRanges() can returned CANCELLED even when adding an
empty list of scan ranges. We should avoid adding empty lists of
scan ranges, specifically after all ranges have been completed, e.g.
if the scanner threads all complete before scan ranges are issued.

The specific race that was possible when added non-compressed text
ranges was:
1. HdfsTextScanner::IssueInitialRanges() starts executing on thread A and
   issues 1 or more non-compressed ranges.
2. A scanner thread processes all the non-compressed ranges, notices
   completion and calls SetDone(), which cancels reader_context_.
3. HdfsTextScanner::IssueInitialRanges() calls AddScanRanges() with empty
  compressed ranges, and returns CANCELLED because the reader_context_ is
  cancelled.

This patch fixes HdfsTextScanner::IssueInitialRanges() to avoid the
anti-pattern of adding empty lists of scan ranges.

Testing:
Ran core tests.

Looped the table sample and scanner tests for a while.

Change-Id: I661a9e234fb87ebdd2596519b44ffba0d7e91d4c
Reviewed-on: http://gerrit.cloudera.org:8080/9456
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d57fbec6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d57fbec6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d57fbec6

Branch: refs/heads/master
Commit: d57fbec6f67b83227b4c6117476da8f7d75fc4f6
Parents: 4c8b02f
Author: Tim Armstrong <ta...@tarmstrong-ubuntu.gce.cloudera.com>
Authored: Mon Feb 26 16:41:59 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 01:45:57 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 10 +++++++---
 be/src/exec/hdfs-scan-node-base.cc  |  1 +
 be/src/exec/hdfs-text-scanner.cc    |  6 ++++--
 be/src/runtime/io/disk-io-mgr.cc    |  1 +
 be/src/runtime/io/disk-io-mgr.h     |  2 +-
 5 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 51a39be..574ddb0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -121,9 +121,13 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       }
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all the files
-  // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  // We may not have any scan ranges if this scan node does not have the footer split for
+  // any Parquet file.
+  if (footer_ranges.size() > 0) {
+    // The threads that process the footer will also do the scan, so we mark all the files
+    // as complete here.
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 89f3859..5625389 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -548,6 +548,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
   DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(ranges.size(), 0);
   RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 253bcc8..2a6a912 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,8 +144,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DCHECK(false);
     }
   }
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
-          compressed_text_files));
+  if (compressed_text_scan_ranges.size() > 0) {
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+        compressed_text_files));
+  }
   if (lzo_text_files.size() > 0) {
     // This will dlopen the lzo binary and can fail if the lzo binary is not present.
     RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 07e02b4..0d2afe2 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -338,6 +338,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
 
 Status DiskIoMgr::AddScanRanges(
     RequestContext* reader, const vector<ScanRange*>& ranges) {
+  DCHECK_GT(ranges.size(), 0);
   // Validate and initialize all ranges
   for (int i = 0; i < ranges.size(); ++i) {
     RETURN_IF_ERROR(ValidateScanRange(ranges[i]));

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 760d0e9..2b6881b 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -256,7 +256,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// Adds the scan ranges to reader's queues, but does not start scheduling it. The range
   /// can be scheduled by a thread calling GetNextUnstartedRange(). This call is
   /// non-blocking. The caller must not deallocate the scan range pointers before
-  /// UnregisterContext().
+  /// UnregisterContext(). 'ranges' must not be empty.
   Status AddScanRanges(
       RequestContext* reader, const std::vector<ScanRange*>& ranges) WARN_UNUSED_RESULT;