You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/28 16:10:13 UTC

[1/8] impala git commit: IMPALA-6588: don't add empty list of ranges in text scan

Repository: impala
Updated Branches:
  refs/heads/2.x ecbb88da5 -> 9e2b77c08


IMPALA-6588: don't add empty list of ranges in text scan

DiskIoMgr::AddScanRanges() can returned CANCELLED even when adding an
empty list of scan ranges. We should avoid adding empty lists of
scan ranges, specifically after all ranges have been completed, e.g.
if the scanner threads all complete before scan ranges are issued.

The specific race that was possible when added non-compressed text
ranges was:
1. HdfsTextScanner::IssueInitialRanges() starts executing on thread A and
   issues 1 or more non-compressed ranges.
2. A scanner thread processes all the non-compressed ranges, notices
   completion and calls SetDone(), which cancels reader_context_.
3. HdfsTextScanner::IssueInitialRanges() calls AddScanRanges() with empty
  compressed ranges, and returns CANCELLED because the reader_context_ is
  cancelled.

This patch fixes HdfsTextScanner::IssueInitialRanges() to avoid the
anti-pattern of adding empty lists of scan ranges.

Testing:
Ran core tests.

Looped the table sample and scanner tests for a while.

Change-Id: I661a9e234fb87ebdd2596519b44ffba0d7e91d4c
Reviewed-on: http://gerrit.cloudera.org:8080/9456
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9e2b77c0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9e2b77c0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9e2b77c0

Branch: refs/heads/2.x
Commit: 9e2b77c081247daf8fe789e2df49b459d7db71e0
Parents: 0060129
Author: Tim Armstrong <ta...@tarmstrong-ubuntu.gce.cloudera.com>
Authored: Mon Feb 26 16:41:59 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 10 +++++++---
 be/src/exec/hdfs-scan-node-base.cc  |  1 +
 be/src/exec/hdfs-text-scanner.cc    |  6 ++++--
 be/src/runtime/io/disk-io-mgr.cc    |  1 +
 be/src/runtime/io/disk-io-mgr.h     |  2 +-
 5 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9e2b77c0/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 51a39be..574ddb0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -121,9 +121,13 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       }
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all the files
-  // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  // We may not have any scan ranges if this scan node does not have the footer split for
+  // any Parquet file.
+  if (footer_ranges.size() > 0) {
+    // The threads that process the footer will also do the scan, so we mark all the files
+    // as complete here.
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9e2b77c0/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b3cc4f0..0dfd6f0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -554,6 +554,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
   DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(ranges.size(), 0);
   RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/9e2b77c0/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 253bcc8..2a6a912 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,8 +144,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DCHECK(false);
     }
   }
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
-          compressed_text_files));
+  if (compressed_text_scan_ranges.size() > 0) {
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+        compressed_text_files));
+  }
   if (lzo_text_files.size() > 0) {
     // This will dlopen the lzo binary and can fail if the lzo binary is not present.
     RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));

http://git-wip-us.apache.org/repos/asf/impala/blob/9e2b77c0/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 07e02b4..0d2afe2 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -338,6 +338,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
 
 Status DiskIoMgr::AddScanRanges(
     RequestContext* reader, const vector<ScanRange*>& ranges) {
+  DCHECK_GT(ranges.size(), 0);
   // Validate and initialize all ranges
   for (int i = 0; i < ranges.size(); ++i) {
     RETURN_IF_ERROR(ValidateScanRange(ranges[i]));

http://git-wip-us.apache.org/repos/asf/impala/blob/9e2b77c0/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 760d0e9..2b6881b 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -256,7 +256,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// Adds the scan ranges to reader's queues, but does not start scheduling it. The range
   /// can be scheduled by a thread calling GetNextUnstartedRange(). This call is
   /// non-blocking. The caller must not deallocate the scan range pointers before
-  /// UnregisterContext().
+  /// UnregisterContext(). 'ranges' must not be empty.
   Status AddScanRanges(
       RequestContext* reader, const std::vector<ScanRange*>& ranges) WARN_UNUSED_RESULT;
 


[3/8] impala git commit: IMPALA-6530: Track time spent opening HDFS file handles

Posted by ta...@apache.org.
IMPALA-6530: Track time spent opening HDFS file handles

When the HDFS NameNode is overloaded, opening file
handles can be a significant source of query execution
time. Currently, there is no statistic to track this
time at the HDFS scan node level.

This introduces a statistic "TotalRawHdfsOpenFileTime(*)"
to track the time spent in HdfsOpenFile(). Here is
an example of this statistic populated for the query
"select * from functional_parquet.widetable_1000_cols",
which is dominated by file handle opening time:
- CachedFileHandlesHitCount: 0 (0)
- CachedFileHandlesMissCount: 1.00K (1001)
...
- ScannerThreadsTotalWallClockTime: 980.432ms
  - MaterializeTupleTime(*): 1.759ms
  - ScannerThreadsSysTime: 4.000ms
  - ScannerThreadsUserTime: 56.000ms
- TotalRawHdfsOpenFileTime(*): 894.285ms
- TotalRawHdfsReadTime(*): 25.188ms

To make the TotalRawHdfsReadTime mutually exclusive
from the TotalRawHdfsOpenFileTime, the timer tracking
for the read timer moves from DiskIoMgr::ReadRange()
to inside the ScanRange::Read() function. This allows
it to exclude the portion of ScanRange::Read() that
is getting a file handle from the file handle cache.

Change-Id: Ia560af2d9b12f158e8811900a7b9d98f8e760858
Reviewed-on: http://gerrit.cloudera.org:8080/9370
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e33e3d77
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e33e3d77
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e33e3d77

Branch: refs/heads/2.x
Commit: e33e3d771527713899d5936e5b37c88f45a76be0
Parents: 46eab0a
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Feb 15 16:08:21 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc  |   2 +
 be/src/exec/scan-node.cc            |   1 +
 be/src/exec/scan-node.h             |   3 +
 be/src/runtime/io/disk-io-mgr.cc    |   8 +-
 be/src/runtime/io/disk-io-mgr.h     |  11 ++-
 be/src/runtime/io/request-context.h |   7 ++
 be/src/runtime/io/scan-range.cc     | 126 +++++++++++++++++--------------
 be/src/util/impalad-metrics.cc      |   6 ++
 be/src/util/impalad-metrics.h       |   4 +
 common/thrift/metrics.json          |  10 +++
 10 files changed, 113 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 164197d..b3cc4f0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -361,6 +361,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   // TODO: Revisit counters and move the counters specific to multi-threaded scans
   // into HdfsScanNode.
   read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
+  open_file_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_OPEN_FILE_TIMER);
   per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
       PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
@@ -377,6 +378,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
 
   reader_context_->set_bytes_read_counter(bytes_read_counter());
   reader_context_->set_read_timer(read_timer());
+  reader_context_->set_open_file_timer(open_file_timer());
   reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
   reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 27726f8..8a18a0c 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -38,6 +38,7 @@ const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
 const string ScanNode::ROWS_READ_COUNTER = "RowsRead";
 const string ScanNode::COLLECTION_ITEMS_READ_COUNTER = "CollectionItemsRead";
 const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
+const string ScanNode::TOTAL_HDFS_OPEN_FILE_TIMER = "TotalRawHdfsOpenFileTime(*)";
 const string ScanNode::TOTAL_HBASE_READ_TIMER = "TotalRawHBaseReadTime(*)";
 const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
 const string ScanNode::MATERIALIZE_TUPLE_TIMER = "MaterializeTupleTime(*)";

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 0976e27..6e137af 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -108,6 +108,7 @@ class ScanNode : public ExecNode {
     return collection_items_read_counter_;
   }
   RuntimeProfile::Counter* read_timer() const { return read_timer_; }
+  RuntimeProfile::Counter* open_file_timer() const { return open_file_timer_; }
   RuntimeProfile::Counter* total_throughput_counter() const {
     return total_throughput_counter_;
   }
@@ -135,6 +136,7 @@ class ScanNode : public ExecNode {
   static const std::string ROWS_READ_COUNTER;
   static const std::string COLLECTION_ITEMS_READ_COUNTER;
   static const std::string TOTAL_HDFS_READ_TIMER;
+  static const std::string TOTAL_HDFS_OPEN_FILE_TIMER;
   static const std::string TOTAL_HBASE_READ_TIMER;
   static const std::string TOTAL_THROUGHPUT_COUNTER;
   static const std::string PER_READ_THREAD_THROUGHPUT_COUNTER;
@@ -168,6 +170,7 @@ class ScanNode : public ExecNode {
   /// [(2, [(3)]), (4, [])] this counter will be 3: (2, [(3)]), (3) and (4, [])
   RuntimeProfile::Counter* collection_items_read_counter_;
   RuntimeProfile::Counter* read_timer_; // total read time
+  RuntimeProfile::Counter* open_file_timer_; // total time spent opening file handles
   /// Wall based aggregate read throughput [bytes/sec]
   RuntimeProfile::Counter* total_throughput_counter_;
   /// Per thread read throughput [bytes/sec]

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 0ac3669..07e02b4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -740,8 +740,6 @@ void DiskIoMgr::ReadRange(
     // Update counters.
     COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
     COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id);
-    SCOPED_TIMER(&read_timer_);
-    SCOPED_TIMER(reader->read_timer_);
 
     read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
         &buffer_desc->len_, &buffer_desc->eosr_);
@@ -841,6 +839,7 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
 
 ExclusiveHdfsFileHandle* DiskIoMgr::GetExclusiveHdfsFileHandle(const hdfsFS& fs,
     std::string* fname, int64_t mtime, RequestContext *reader) {
+  SCOPED_TIMER(reader->open_file_timer_);
   ExclusiveHdfsFileHandle* fid = new ExclusiveHdfsFileHandle(fs, fname->data(), mtime);
   if (!fid->ok()) {
     VLOG_FILE << "Opening the file " << fname << " failed.";
@@ -864,6 +863,7 @@ void DiskIoMgr::ReleaseExclusiveHdfsFileHandle(ExclusiveHdfsFileHandle* fid) {
 CachedHdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
     std::string* fname, int64_t mtime, RequestContext *reader) {
   bool cache_hit;
+  SCOPED_TIMER(reader->open_file_timer_);
   CachedHdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, false,
       &cache_hit);
   if (fh == nullptr) return nullptr;
@@ -887,8 +887,10 @@ void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname,
 }
 
 Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname,
-    int64_t mtime, CachedHdfsFileHandle** fid) {
+    int64_t mtime, RequestContext* reader, CachedHdfsFileHandle** fid) {
   bool cache_hit;
+  SCOPED_TIMER(reader->open_file_timer_);
+  ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED->Increment(1L);
   file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
   // The old handle has been destroyed, so *fid must be overwritten before returning.
   *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index d246e95..760d0e9 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -343,7 +343,8 @@ class DiskIoMgr : public CacheLineAligned {
   bool Validate() const;
 
   /// Given a FS handle, name and last modified time of the file, construct a new
-  /// ExclusiveHdfsFileHandle. In the case of an error, returns nullptr.
+  /// ExclusiveHdfsFileHandle. This records the time spent opening the handle in
+  /// 'reader' and counts this as a cache miss. In the case of an error, returns nullptr.
   ExclusiveHdfsFileHandle* GetExclusiveHdfsFileHandle(const hdfsFS& fs,
       std::string* fname, int64_t mtime, RequestContext* reader);
 
@@ -351,7 +352,8 @@ class DiskIoMgr : public CacheLineAligned {
   void ReleaseExclusiveHdfsFileHandle(ExclusiveHdfsFileHandle* fid);
 
   /// Given a FS handle, name and last modified time of the file, gets a
-  /// CachedHdfsFileHandle from the file handle cache. On success, records statistics
+  /// CachedHdfsFileHandle from the file handle cache. Records the time spent
+  /// opening the handle in 'reader'. On success, records statistics
   /// about whether this was a cache hit or miss in the 'reader' as well as at the
   /// system level. In case of an error returns nullptr.
   CachedHdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
@@ -361,9 +363,10 @@ class DiskIoMgr : public CacheLineAligned {
   void ReleaseCachedHdfsFileHandle(std::string* fname, CachedHdfsFileHandle* fid);
 
   /// Reopens a file handle by destroying the file handle and getting a fresh
-  /// file handle from the cache. Returns an error if the file could not be reopened.
+  /// file handle from the cache. Records the time spent reopening the handle
+  /// in 'reader'. Returns an error if the file could not be reopened.
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
-      CachedHdfsFileHandle** fid);
+      RequestContext* reader, CachedHdfsFileHandle** fid);
 
   /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
   /// disk ID (i.e. disk_queue_ index) of num_local_disks().

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 24fd0fc..b028596 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -128,6 +128,10 @@ class RequestContext {
 
   void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; }
 
+  void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) {
+    open_file_timer_ = open_file_timer;
+  }
+
   void set_active_read_thread_counter(
       RuntimeProfile::Counter* active_read_thread_counter) {
    active_read_thread_counter_ = active_read_thread_counter;
@@ -245,6 +249,9 @@ class RequestContext {
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter* read_timer_ = nullptr;
 
+  /// Total time spent open hdfs file handles
+  RuntimeProfile::Counter* open_file_timer_ = nullptr;
+
   /// Number of active read threads
   RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 9c2110c..4f7c38b 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -503,73 +503,83 @@ Status ScanRange::Read(
 
     int64_t max_chunk_size = MaxReadChunkSize();
     Status status = Status::OK();
-    while (*bytes_read < bytes_to_read) {
-      int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
-      DCHECK_GE(chunk_size, 0);
-      // The hdfsRead() length argument is an int.
-      DCHECK_LE(chunk_size, numeric_limits<int>::max());
-      int current_bytes_read = -1;
-      // bytes_read_ is only updated after the while loop
-      int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
-      int num_retries = 0;
-      while (true) {
-        status = Status::OK();
-        // For file handles from the cache, any of the below file operations may fail
-        // due to a bad file handle. In each case, record the error, but allow for a
-        // retry to fix it.
-        if (FLAGS_use_hdfs_pread) {
-          current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
-              buffer + *bytes_read, chunk_size);
-          if (current_bytes_read == -1) {
-            status = Status(TErrorCode::DISK_IO_ERROR,
-                GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
-          }
-        } else {
-          // If the file handle is borrowed, it may not be at the appropriate
-          // location. Seek to the appropriate location.
-          bool seek_failed = false;
-          if (borrowed_hdfs_fh != nullptr) {
-            if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
-              status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 "
-                  " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg("")));
-              seek_failed = true;
-            }
-          }
-          if (!seek_failed) {
-            current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
-                chunk_size);
+    {
+      ScopedTimer<MonotonicStopWatch> io_mgr_read_timer(&io_mgr_->read_timer_);
+      ScopedTimer<MonotonicStopWatch> req_context_read_timer(reader_->read_timer_);
+      while (*bytes_read < bytes_to_read) {
+        int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
+        DCHECK_GE(chunk_size, 0);
+        // The hdfsRead() length argument is an int.
+        DCHECK_LE(chunk_size, numeric_limits<int>::max());
+        int current_bytes_read = -1;
+        // bytes_read_ is only updated after the while loop
+        int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
+        int num_retries = 0;
+        while (true) {
+          status = Status::OK();
+          // For file handles from the cache, any of the below file operations may fail
+          // due to a bad file handle. In each case, record the error, but allow for a
+          // retry to fix it.
+          if (FLAGS_use_hdfs_pread) {
+            current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
+                buffer + *bytes_read, chunk_size);
             if (current_bytes_read == -1) {
               status = Status(TErrorCode::DISK_IO_ERROR,
                   GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
             }
+          } else {
+            // If the file handle is borrowed, it may not be at the appropriate
+            // location. Seek to the appropriate location.
+            bool seek_failed = false;
+            if (borrowed_hdfs_fh != nullptr) {
+              if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
+                status = Status(TErrorCode::DISK_IO_ERROR,
+                  Substitute("Error seeking to $0 in file: $1: $2", position_in_file,
+                      file_, GetHdfsErrorMsg("")));
+                seek_failed = true;
+              }
+            }
+            if (!seek_failed) {
+              current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
+                  chunk_size);
+              if (current_bytes_read == -1) {
+                status = Status(TErrorCode::DISK_IO_ERROR,
+                    GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+              }
+            }
           }
-        }
 
-        // Do not retry:
-        // - if read was successful (current_bytes_read != -1)
-        // - or if already retried once
-        // - or if this not using a borrowed file handle
-        DCHECK_LE(num_retries, 1);
-        if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
-            num_retries == 1) {
+          // Do not retry:
+          // - if read was successful (current_bytes_read != -1)
+          // - or if already retried once
+          // - or if this not using a borrowed file handle
+          DCHECK_LE(num_retries, 1);
+          if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
+              num_retries == 1) {
+            break;
+          }
+          // The error may be due to a bad file handle. Reopen the file handle and retry.
+          // Exclude this time from the read timers.
+          io_mgr_read_timer.Stop();
+          req_context_read_timer.Stop();
+          ++num_retries;
+          RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
+              mtime(), reader_, &borrowed_hdfs_fh));
+          hdfs_file = borrowed_hdfs_fh->file();
+          io_mgr_read_timer.Start();
+          req_context_read_timer.Start();
+        }
+        if (!status.ok()) break;
+        if (current_bytes_read == 0) {
+          // No more bytes in the file. The scan range went past the end.
+          *eosr = true;
           break;
         }
-        // The error may be due to a bad file handle. Reopen the file handle and retry.
-        ++num_retries;
-        RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
-            mtime(), &borrowed_hdfs_fh));
-        hdfs_file = borrowed_hdfs_fh->file();
-      }
-      if (!status.ok()) break;
-      if (current_bytes_read == 0) {
-        // No more bytes in the file. The scan range went past the end.
-        *eosr = true;
-        break;
-      }
-      *bytes_read += current_bytes_read;
+        *bytes_read += current_bytes_read;
 
-      // Collect and accumulate statistics
-      GetHdfsStatistics(hdfs_file);
+        // Collect and accumulate statistics
+        GetHdfsStatistics(hdfs_file);
+      }
     }
 
     if (borrowed_hdfs_fh != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 32320d8..815e4af 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -66,6 +66,8 @@ const char* ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT =
     "impala-server.io.mgr.cached-file-handles-hit-count";
 const char* ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT =
     "impala-server.io.mgr.cached-file-handles-miss-count";
+const char* ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_REOPENED =
+    "impala-server.io.mgr.cached-file-handles-reopened";
 const char* ImpaladMetricKeys::CATALOG_NUM_DBS =
     "catalog.num-databases";
 const char* ImpaladMetricKeys::CATALOG_NUM_TABLES =
@@ -117,6 +119,7 @@ IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = NULL;
 IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = NULL;
+IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = NULL;
 IntCounter* ImpaladMetrics::HEDGED_READ_OPS = NULL;
 IntCounter* ImpaladMetrics::HEDGED_READ_OPS_WIN = NULL;
 
@@ -215,6 +218,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT, 0);
 
+  IO_MGR_CACHED_FILE_HANDLES_REOPENED = m->AddCounter(
+      ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_REOPENED, 0);
+
   IO_MGR_BYTES_READ = m->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
   IO_MGR_LOCAL_BYTES_READ = m->AddCounter(
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/be/src/util/impalad-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index a62c4c6..7de7aa8 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -97,6 +97,9 @@ class ImpaladMetricKeys {
   /// Number of cache misses for cached HDFS file handles
   static const char* IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT;
 
+  /// Number of cached file handles that hit an error and were reopened
+  static const char* IO_MGR_CACHED_FILE_HANDLES_REOPENED;
+
   /// Number of DBs in the catalog
   static const char* CATALOG_NUM_DBS;
 
@@ -174,6 +177,7 @@ class ImpaladMetrics {
   static IntCounter* IO_MGR_CACHED_BYTES_READ;
   static IntCounter* IO_MGR_SHORT_CIRCUIT_BYTES_READ;
   static IntCounter* IO_MGR_BYTES_WRITTEN;
+  static IntCounter* IO_MGR_CACHED_FILE_HANDLES_REOPENED;
   static IntCounter* HEDGED_READ_OPS;
   static IntCounter* HEDGED_READ_OPS_WIN;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e33e3d77/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 6328cd4..cce34d7 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1575,6 +1575,16 @@
     "key": "impala-server.io.mgr.cached-file-handles-miss-count"
   },
   {
+    "description": "Number of cached HDFS file handles reopened",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HDFS cached file handles reopened",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala-server.io.mgr.cached-file-handles-reopened"
+  },
+  {
     "description": "The number of active scratch directories for spilling to disk.",
     "contexts": [
       "IMPALAD"


[4/8] impala git commit: IMPALA-6586: Fix bug in TestGetTablesTypeTable()

Posted by ta...@apache.org.
IMPALA-6586: Fix bug in TestGetTablesTypeTable()

The bug in FrontendTest.TestGetTablesTypeTable() was
that it did not explicitly load views that the test
assumed to be loaded already. The test needs to
distinguish between views and tables and views need
to be loaded for them to be discernable from tables.

I was able to reproduce the issue localy by just
running FrontendTest.TestGetTablesTypeTable() without
any other test.

Testing:
- locally ran all tests in FrontendTest individually
  (with a fresh ImpaladTestCatalog)

Change-Id: Idf0bddb2e29209adda5bda5ddc428f46f241c8c9
Reviewed-on: http://gerrit.cloudera.org:8080/9453
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e9d3fc28
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e9d3fc28
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e9d3fc28

Branch: refs/heads/2.x
Commit: e9d3fc2829d941811cb020e76d779e65bbe35b31
Parents: e33e3d7
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Feb 26 12:02:01 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/service/FrontendTest.java | 23 +++++++++++++++-----
 1 file changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e9d3fc28/fe/src/test/java/org/apache/impala/service/FrontendTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index de8b7ac..c7fee57 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -104,6 +104,12 @@ public class FrontendTest extends FrontendTestBase {
 
   @Test
   public void TestGetTablesTypeTable() throws ImpalaException {
+    // Make sure these views are loaded so they can be distinguished from tables.
+    AnalyzesOk("select * from functional.alltypes_hive_view");
+    AnalyzesOk("select * from functional.alltypes_parens");
+    AnalyzesOk("select * from functional.alltypes_view");
+    AnalyzesOk("select * from functional.alltypes_view_sub");
+
     TMetadataOpRequest req = new TMetadataOpRequest();
     req.opcode = TMetadataOpcode.GET_TABLES;
     req.get_tables_req = new TGetTablesReq();
@@ -115,12 +121,13 @@ public class FrontendTest extends FrontendTestBase {
     assertEquals(5, resp.schema.columns.size());
     assertEquals(5, resp.rows.get(0).colVals.size());
     assertEquals(1, resp.rows.size());
-    assertEquals("alltypes_datasource", resp.rows.get(0).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_datasource",
+        resp.rows.get(0).colVals.get(2).string_val.toLowerCase());
   }
 
   @Test
   public void TestGetTablesTypeView() throws ImpalaException {
-    // Issue the statements to make sure all the views are loaded
+    // Make sure these views are loaded so they can be distinguished from tables.
     AnalyzesOk("select * from functional.alltypes_hive_view");
     AnalyzesOk("select * from functional.alltypes_parens");
     AnalyzesOk("select * from functional.alltypes_view");
@@ -137,10 +144,14 @@ public class FrontendTest extends FrontendTestBase {
     assertEquals(5, resp.schema.columns.size());
     assertEquals(5, resp.rows.get(0).colVals.size());
     assertEquals(4, resp.rows.size());
-    assertEquals("alltypes_hive_view", resp.rows.get(0).colVals.get(2).string_val.toLowerCase());
-    assertEquals("alltypes_parens", resp.rows.get(1).colVals.get(2).string_val.toLowerCase());
-    assertEquals("alltypes_view", resp.rows.get(2).colVals.get(2).string_val.toLowerCase());
-    assertEquals("alltypes_view_sub", resp.rows.get(3).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_hive_view",
+        resp.rows.get(0).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_parens",
+        resp.rows.get(1).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_view",
+        resp.rows.get(2).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_view_sub",
+        resp.rows.get(3).colVals.get(2).string_val.toLowerCase());
   }
 
   @Test


[6/8] impala git commit: IMPALA-6584: Fix TestKuduOperations::test_column_storage_attributes

Posted by ta...@apache.org.
IMPALA-6584: Fix TestKuduOperations::test_column_storage_attributes

In test_column_storage_attributes the column type is also used as
the column name. To ensure the type name is also a valid column
name, the default decimal type is now used.

Change-Id: Ia7a8c3a07e50e1b7e89192d15380caf8550e4945
Reviewed-on: http://gerrit.cloudera.org:8080/9455
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/981c6f28
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/981c6f28
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/981c6f28

Branch: refs/heads/2.x
Commit: 981c6f2808200f6f64231e310e62eed91935d575
Parents: e9d3fc2
Author: Grant Henke <gh...@cloudera.com>
Authored: Sun Feb 25 13:37:31 2018 -0600
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 tests/query_test/test_kudu.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/981c6f28/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 3d7b727..b0ced46 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -357,7 +357,7 @@ class TestKuduOperations(KuduTestSuite):
       pytest.skip("Only runs in exhaustive to reduce core time.")
     table_name = "%s.storage_attrs" % unique_database
     types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double', \
-        'string', 'timestamp', 'decimal(9, 2)', 'decimal(18)', 'decimal(38, 38)']
+        'string', 'timestamp', 'decimal']
 
     create_query = "create table %s (id int primary key" % table_name
     for t in types:
@@ -378,10 +378,10 @@ class TestKuduOperations(KuduTestSuite):
           except Exception as err:
             assert "encoding %s not supported for type" % e in str(err)
         cursor.execute("""insert into %s values (%s, true, 0, 0, 0, 0, 0, 0, '0',
-            cast('2009-01-01' as timestamp))""" % (table_name, i))
+            cast('2009-01-01' as timestamp), cast(0 as decimal))""" % (table_name, i))
         cursor.execute("select * from %s where id = %s" % (table_name, i))
         assert cursor.fetchall() == \
-            [(i, True, 0, 0, 0, 0, 0.0, 0.0, '0', datetime(2009, 1, 1, 0, 0))]
+            [(i, True, 0, 0, 0, 0, 0.0, 0.0, '0', datetime(2009, 1, 1, 0, 0), 0)]
         i += 1
     cursor.execute("select count(*) from %s" % table_name)
     print cursor.fetchall() == [(i, )]


[2/8] impala git commit: IMPALA-6512: Maintenace thread period should respect FLAGS_datastream_sender_timeout_ms

Posted by ta...@apache.org.
IMPALA-6512: Maintenace thread period should respect FLAGS_datastream_sender_timeout_ms

Previously, the maintenance thread in KrpcDataStreamMgr will wake up
once every 10s to check for early senders which timed out. However,
FLAGS_datastream_sender_timeout_ms can be set to a value smaller than
10s. In which case, we may not notice the timed-out senders until much
later. This change fixes the problem by changing the wakeup period of
the maintenance thread to be min of FLAGS_datastream_sender_timeout_ms/2
and 10000 milliseconds. Also, this change addresses a TODO in the code by
moving the check for closed receivers in the 'closed_stream_cache_' from
the handler for EOS RPC to the maintenance thread's loop.

Testing done: test_exchange_large_delay.py which failed previously
when KRPC is enabled. Core debug builds with KRPC enabled.

Change-Id: I804cef7cc991007ec44375f8eac804aa2df46bd7
Reviewed-on: http://gerrit.cloudera.org:8080/9447
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bbe71929
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bbe71929
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bbe71929

Branch: refs/heads/2.x
Commit: bbe7192924c5ae38c9cb0d6cddef37aa80a9be9f
Parents: b289f14
Author: Michael Ho <kw...@cloudera.com>
Authored: Sat Feb 24 14:14:54 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc     |  1 +
 be/src/runtime/krpc-data-stream-mgr.cc | 48 ++++++++++++++---------------
 2 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bbe71929/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 11374a2..bd76b57 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -699,6 +699,7 @@ class DataStreamTestThriftOnly : public DataStreamTest {
 class DataStreamTestShortDeserQueue : public DataStreamTest {
  protected:
   virtual void SetUp() {
+    FLAGS_datastream_sender_timeout_ms = 10000;
     FLAGS_datastream_service_num_deserialization_threads = 1;
     FLAGS_datastream_service_deserialization_queue_size = 1;
     DataStreamTest::SetUp();

http://git-wip-us.apache.org/repos/asf/impala/blob/bbe71929/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 184ec49..02609c8 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -285,27 +285,6 @@ void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
   RespondAndReleaseRpc(Status::OK(), response, rpc_context, service_mem_tracker_);
-
-  {
-    // TODO: Move this to maintenance thread.
-    // Remove any closed streams that have been in the cache for more than
-    // STREAM_EXPIRATION_TIME_MS.
-    lock_guard<mutex> l(lock_);
-    ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
-    int64_t now = MonotonicMillis();
-    int32_t before = closed_stream_cache_.size();
-    while (it != closed_stream_expirations_.end() && it->first < now) {
-      closed_stream_cache_.erase(it->second);
-      closed_stream_expirations_.erase(it++);
-    }
-    DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
-    int32_t after = closed_stream_cache_.size();
-    if (before != after) {
-      VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
-                 << ", eviction took: "
-                 << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
-    }
-  }
 }
 
 Status KrpcDataStreamMgr::DeregisterRecvr(
@@ -385,12 +364,15 @@ void KrpcDataStreamMgr::RespondAndReleaseRpc(const Status& status,
 }
 
 void KrpcDataStreamMgr::Maintenance() {
+  const int32_t sleep_time_ms =
+      min(max(1, FLAGS_datastream_sender_timeout_ms / 2), 10000);
   while (true) {
+    const int64_t now = MonotonicMillis();
+
     // Notify any senders that have been waiting too long for their receiver to
     // appear. Keep lock_ held for only a short amount of time.
     vector<EarlySendersList> timed_out_senders;
     {
-      int64_t now = MonotonicMillis();
       lock_guard<mutex> l(lock_);
       auto it = early_senders_map_.begin();
       while (it != early_senders_map_.end()) {
@@ -415,9 +397,27 @@ void KrpcDataStreamMgr::Maintenance() {
         RespondToTimedOutSender<EndDataStreamCtx, EndDataStreamRequestPB>(ctx);
       }
     }
+
+    // Remove any closed streams that have been in the cache for more than
+    // STREAM_EXPIRATION_TIME_MS.
+    {
+      lock_guard<mutex> l(lock_);
+      ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
+      int32_t before = closed_stream_cache_.size();
+      while (it != closed_stream_expirations_.end() && it->first < now) {
+        closed_stream_cache_.erase(it->second);
+        closed_stream_expirations_.erase(it++);
+      }
+      DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
+      int32_t after = closed_stream_cache_.size();
+      if (before != after) {
+        VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
+                   << ", eviction took: "
+                   << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
+      }
+    }
     bool timed_out = false;
-    // Wait for 10s
-    shutdown_promise_.Get(10000, &timed_out);
+    shutdown_promise_.Get(sleep_time_ms, &timed_out);
     if (!timed_out) return;
   }
 }


[8/8] impala git commit: IMPALA-2990: Add a warning message during cancellation

Posted by ta...@apache.org.
IMPALA-2990: Add a warning message during cancellation

Until IMPALA-2990 is fixed, there is no easy way to
prevent query hangs due to cancellation of fragment
instances in a backend node when it fails to report
status to the coordinator. Given it's rather hard to
diagnose IMPALA-2990, this change adds a log statement
at the point of cancellation to warn about IMPALA-2990.
This hopefully makes diagnostics slightly easier.

Change-Id: Icfd56edfe74707592f4dd9c840550b13cb80e9a0
Reviewed-on: http://gerrit.cloudera.org:8080/9413
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b289f14d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b289f14d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b289f14d

Branch: refs/heads/2.x
Commit: b289f14dc2943f20ec2d2f7c6871e16e12053bdb
Parents: 981c6f2
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 22 17:37:01 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/query-state.cc | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b289f14d/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5a11caf..b290784 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -274,8 +274,13 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     // report, following this Cancel(), may not succeed anyway.)
     // TODO: not keeping an error status here means that all instances might
     // abort with CANCELLED status, despite there being an error
-    // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
-    // hang as the coordinator may not be aware of the cancellation.
+    if (!rpc_status.ok()) {
+      // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
+      // hang as the coordinator may not be aware of the cancellation. Remove the log
+      // statement once IMPALA-2990 is fixed.
+      LOG(ERROR) << "Cancelling fragment instances due to failure to report status. "
+                 << "Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    }
     Cancel();
   }
 }


[5/8] impala git commit: IMPALA-6543: Limit RowBatch serialization size to INT_MAX

Posted by ta...@apache.org.
IMPALA-6543: Limit RowBatch serialization size to INT_MAX

The serialization format of a row batch relies on
tuple offsets. In its current form, the tuple offsets
are int32s. This means that it is impossible to generate
a valid serialization of a row batch that is larger
than INT_MAX.

This changes RowBatch::SerializeInternal() to return an
error if trying to serialize a row batch larger than INT_MAX.
This prevents a DCHECK on debug builds when creating a row
larger than 2GB.

This also changes the compression logic in RowBatch::Serialize()
to avoid a DCHECK if LZ4 will not be able to compress the
row batch. Instead, it returns an error.

This modifies row-batch-serialize-test to verify behavior at
each of the limits. Specifically:
RowBatches up to size LZ4_MAX_INPUT_SIZE succeed.
RowBatches with size range [LZ4_MAX_INPUT_SIZE+1, INT_MAX]
fail on LZ4 compression.
RowBatches with size > INT_MAX fail with RowBatch too large.

Change-Id: I3b022acdf3bc93912d6d98829b30e44b65890d91
Reviewed-on: http://gerrit.cloudera.org:8080/9367
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/46eab0a1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/46eab0a1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/46eab0a1

Branch: refs/heads/2.x
Commit: 46eab0a1652b1ded2c3aa9b52e051499b86bd0a1
Parents: ecbb88d
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue Feb 20 11:38:23 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------

----------------------------------------------------------------------



[7/8] impala git commit: IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

Posted by ta...@apache.org.
IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

This change fixes a couple of issues found with stress test
(concurrent_select.py)

1. The local variable 'status' was mistakenly defined twice in
DequeueDeferredRpc(): one in the outer scope and one in the
inner scope. This causes the error status of AddBatchWork()
to be dropped when the inner scope ends. As a result, the error
status from AddBatchWork() (e.g. MemLimitExceeded) will not be
propagated back to the sender and the receiver will continue
to operate with some missing data, leading to wrong query
results. This change fixes the problem by removing the redefinition
of the status local variable. It also adds some counters in the
profile to make diagnostics of failed RPCs or missing EOS easier.

2. AddBatchWork() may return early without enqueuing a row
batch if it encounters a failure creating a row batch. The bug
is that it may return early without notifying threads waiting
on 'data_arrival_cv_', causing threads waiting for
'num_pending_enqueue_' to 0 to wait indefinitely. This may
cause some fragment instances to stick around after the query
has been cancelled.

3. This one is not mainfesting during stress test and it's benign.
There is a missing check for 'is_cancelled_' in TakeOverEarlySenders()
before enqueuing an early sender into the deferred_rpc_ queue.
The bug is benign because TakeOverEarlySenders() is called from
in fragment execution thread context so it cannot call close the
receiver and call CancelStream() on the receiver until it returns
from CreateRecvr() unless the query is cancelled. In which case,
the sender should also get the cancellation. That said, it should
be fixed. This change checks for the 'is_cancelled_' flag before
enqueuing an early sender into the 'deferred_rpc_' queue.

Testing done: Stress test consistently reproduced the problems before.
With this fix, no wrong results have been seen after 2 iterations
of stress tests, which translates to about 20000 queries being run.

Change-Id: I6b2985a47021ebd4a970861040e7474aca7941b5
Reviewed-on: http://gerrit.cloudera.org:8080/9439
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/00601296
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/00601296
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/00601296

Branch: refs/heads/2.x
Commit: 006012968a3bbeb39a0a464aa37bb01bc45ec57c
Parents: bbe7192
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 22 16:19:50 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 03:31:02 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/krpc-data-stream-recvr.cc  | 34 ++++++++++++++++++++------
 be/src/runtime/krpc-data-stream-recvr.h   | 19 +++++++++++---
 be/src/runtime/krpc-data-stream-sender.cc |  5 ++++
 be/src/runtime/krpc-data-stream-sender.h  |  6 +++++
 4 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/00601296/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 2f379f4..4f85996 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -298,7 +298,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
-  COUNTER_ADD(recvr_->num_accepted_batches_, 1);
+  COUNTER_ADD(recvr_->num_received_batches_, 1);
   COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
@@ -324,9 +324,11 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   --num_pending_enqueue_;
   if (UNLIKELY(!status.ok())) {
     recvr_->num_buffered_bytes_.Add(-batch_size);
+    data_arrival_cv_.notify_one();
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+  COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
   return Status::OK();
@@ -347,6 +349,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   kudu::Slice tuple_offsets;
   kudu::Slice tuple_data;
   int64_t batch_size;
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
       &batch_size);
   if (UNLIKELY(!status.ok())) {
@@ -400,15 +403,16 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     DCHECK_GT(num_deserialize_tasks_pending_, 0);
     --num_deserialize_tasks_pending_;
 
-    // Returns if the queue has been cancelled or if it's empty.
-    if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return;
+    // Returns if the queue is empty. The queue may be drained in Cancel().
+    if (deferred_rpcs_.empty()) return;
+    DCHECK(!is_cancelled_);
 
     // Try enqueuing the first entry into 'batch_queue_'.
     ctx.swap(deferred_rpcs_.front());
     kudu::Slice tuple_offsets;
     kudu::Slice tuple_data;
     int64_t batch_size;
-    Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
+    status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
@@ -429,6 +433,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     deferred_rpcs_.pop();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    DCHECK(!status.ok() || !batch_queue_.empty());
   }
 
   // Responds to the sender to ack the insertion of the row batches.
@@ -439,12 +444,17 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
   recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
-  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
+    if (UNLIKELY(is_cancelled_)) {
+      RespondAndReleaseRpc(Status::OK(), ctx);
+      return;
+    }
     deferred_rpcs_.push(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
+  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
       recvr_->dest_node_id(), sender_id, 1);
 }
@@ -489,6 +499,9 @@ void KrpcDataStreamRecvr::SenderQueue::Close() {
   // risk running into a race which can leak row batches. Please see IMPALA-3034.
   DCHECK(is_cancelled_);
 
+  // The deferred RPCs should all have been responded to in Cancel().
+  DCHECK(deferred_rpcs_.empty());
+
   // Wait for any pending insertion to complete first.
   while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l);
 
@@ -566,10 +579,16 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   inactive_timer_ = profile_->inactive_timer();
   num_early_senders_ =
       ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
+  num_arrived_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
+  num_received_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
+  num_enqueued_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
   num_deferred_batches_ =
       ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
-  num_accepted_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT);
+  num_eos_received_ =
+      ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -600,6 +619,7 @@ void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
 void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   sender_queues_[use_sender_id]->DecrementSenders();
+  COUNTER_ADD(num_eos_received_, 1);
 }
 
 void KrpcDataStreamRecvr::CancelStream() {

http://git-wip-us.apache.org/repos/asf/impala/blob/00601296/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index f4c2a5e..758079b 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -195,7 +195,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   RuntimeProfile* recvr_side_profile_;
   RuntimeProfile* sender_side_profile_;
 
-  /// Number of bytes received.
+  /// Number of bytes received but not necessarily enqueued.
   RuntimeProfile::Counter* bytes_received_counter_;
 
   /// Time series of number of bytes received, samples bytes_received_counter_
@@ -211,11 +211,22 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// TODO: Turn this into a wall-clock timer.
   RuntimeProfile::Counter* first_batch_wait_total_timer_;
 
-  /// Total number of batches received and deferred as sender queue is full.
+  /// Total number of batches which arrived at this receiver but not necessarily received
+  /// or enqueued. An arrived row batch will eventually be received if there is no error
+  /// unpacking the RPC payload and the receiving stream is not cancelled.
+  RuntimeProfile::Counter* num_arrived_batches_;
+
+  /// Total number of batches received but not necessarily enqueued.
+  RuntimeProfile::Counter* num_received_batches_;
+
+  /// Total number of batches received and enqueued into the row batch queue.
+  RuntimeProfile::Counter* num_enqueued_batches_;
+
+  /// Total number of batches deferred because of early senders or full row batch queue.
   RuntimeProfile::Counter* num_deferred_batches_;
 
-  /// Total number of batches received and accepted into the sender queue.
-  RuntimeProfile::Counter* num_accepted_batches_;
+  /// Total number of EOS received.
+  RuntimeProfile::Counter* num_eos_received_;
 
   /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
   RuntimeProfile::Counter* data_arrival_timer_;

http://git-wip-us.apache.org/repos/asf/impala/blob/00601296/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 6c0ad01..d758826 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -306,6 +306,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
 }
 
 void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
+  if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1);
   rpc_status_ = status;
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
@@ -535,9 +536,11 @@ Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
     std::unique_lock<SpinLock> l(lock_);
     RETURN_IF_ERROR(WaitForRpc(&l));
     DCHECK(!rpc_in_flight_);
+    DCHECK(rpc_status_.ok());
     if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
     VLOG_RPC << "calling EndDataStream() to terminate channel.";
     rpc_in_flight_ = true;
+    COUNTER_ADD(parent_->eos_sent_counter_, 1);
     RETURN_IF_ERROR(DoEndDataStreamRpc());
     RETURN_IF_ERROR(WaitForRpc(&l));
   }
@@ -619,7 +622,9 @@ Status KrpcDataStreamSender::Prepare(
       &partition_expr_evals_));
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
+  rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
   bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
   total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/00601296/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index ce0851e..0c3a32e 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,9 +157,15 @@ class KrpcDataStreamSender : public DataSink {
   /// Number of TransmitData() RPC retries due to remote service being busy.
   RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
 
+  /// Total number of times RPC fails or the remote responds with a non-retryable error.
+  RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
+
   /// Total number of bytes sent.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
+  /// Total number of EOS sent.
+  RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
+
   /// Total number of bytes of the row batches before compression.
   RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;