You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/11/06 00:23:25 UTC

[impala] branch master updated: IMPALA-11704: Delay hdfsOpenFile with data cache

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d1d4f183d IMPALA-11704: Delay hdfsOpenFile with data cache
d1d4f183d is described below

commit d1d4f183da069b967f7120acfc040e3f6a3598a1
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Thu Nov 3 16:27:51 2022 -0700

    IMPALA-11704: Delay hdfsOpenFile with data cache
    
    Delays hdfsOpenFile until after data cache lookup if using a data cache.
    IMPALA-10147 implemented this, but only when using the file handle
    cache. This patch adds an additional check in case file handle caching
    is disabled.
    
    In networked environments, hdfsOpenFile can take significant time, as
    observed in a TPC-DS run of q90 where TotalRawHdfsOpenFileTime
    represented a majority of time spent for HDFS_SCAN_NODE. This patch
    brings that time to 0 with a primed data cache.
    
    Change-Id: I9429a41fb16de27ccb57730203f95559df0dbfb6
    Reviewed-on: http://gerrit.cloudera.org:8080/19204
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/cache-reader-test-stub.h |  3 ++-
 be/src/runtime/io/file-reader.h            |  3 ++-
 be/src/runtime/io/hdfs-file-reader.cc      | 16 ++++++++++++++--
 be/src/runtime/io/hdfs-file-reader.h       |  9 ++++++++-
 be/src/runtime/io/local-file-reader.cc     |  3 ++-
 be/src/runtime/io/local-file-reader.h      |  3 ++-
 be/src/runtime/io/request-ranges.h         |  3 ++-
 be/src/runtime/io/scan-range.cc            | 10 ++++++----
 tests/common/skip.py                       |  2 +-
 tests/custom_cluster/test_data_cache.py    | 15 ++++++++++++++-
 10 files changed, 53 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h
index e9d5b11fa..bc91974a4 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -42,7 +42,8 @@ public:
   }
 
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
+      bool use_file_handle_cache) override {
     DCHECK(false);
     return Status("Not implemented");
   }
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 7d46529f9..742d7d8e2 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -51,7 +51,8 @@ public:
   /// Metrics in 'queue' are updated with the size and latencies of the read
   /// operations on the underlying file system.
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
+      bool use_file_handle_cache) = 0;
 
   /// ***Currently only for HDFS***
   /// When successful, sets 'data' to a buffer that contains the contents of a file,
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index b12878453..0b98441a2 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -65,6 +65,15 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
   // If using file handle caching, the reader does not maintain its own
   // hdfs file handle, so it can skip opening a file handle.
   if (use_file_handle_cache) return Status::OK();
+  if (scan_range_->UseDataCache() &&
+      scan_range_->io_mgr_->remote_data_cache() != nullptr) {
+    // Use delayed open when using a remote data cache.
+    return Status::OK();
+  }
+  return DoOpen();
+}
+
+Status HdfsFileReader::DoOpen() {
   auto io_mgr = scan_range_->io_mgr_;
   // Get a new exclusive file handle.
   RETURN_IF_ERROR(io_mgr->GetExclusiveHdfsFileHandle(hdfs_fs_,
@@ -106,7 +115,7 @@ std::string HdfsFileReader::GetHostList(int64_t file_offset,
 }
 
 Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
+    int64_t bytes_to_read, int64_t* bytes_read, bool* eof, bool use_file_handle_cache) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -156,10 +165,13 @@ Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_
     hdfsFile hdfs_file;
     if (exclusive_hdfs_fh_ != nullptr) {
       hdfs_file = exclusive_hdfs_fh_->file();
-    } else {
+    } else if (use_file_handle_cache) {
       RETURN_IF_ERROR(io_mgr->GetCachedHdfsFileHandle(hdfs_fs_,
           scan_range_->file_string(), scan_range_->mtime(), request_context, &accessor));
       hdfs_file = accessor.Get()->file();
+    } else {
+      RETURN_IF_ERROR(DoOpen());
+      hdfs_file = exclusive_hdfs_fh_->file();
     }
     req_context_read_timer.Start();
 
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index ed8acc724..7f3660b21 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -36,7 +36,8 @@ public:
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
+      bool use_file_handle_cache) override;
   virtual void Close() override;
   virtual void ResetState() override;
   virtual std::string DebugString() const override;
@@ -50,6 +51,12 @@ public:
   virtual void CachedFile(uint8_t** data, int64_t* length) override;
 
 private:
+
+  /// Performs the actual work of opening a file handle. When using a file handle or data
+  /// cache, opening a file handle is delayed until remote data needs to be read. Not
+  /// thread safe, claim lock_ before calling.
+  Status DoOpen();
+
   /// Probes 'remote_data_cache' for a hit. The requested file's name and mtime
   /// are stored in 'scan_range_'. 'file_offset' is the offset into the file to read
   /// and 'bytes_to_read' is the number of bytes requested. On success, copies the
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 4f3ba3d7c..fa94c19f5 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -52,7 +52,8 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
 }
 
 Status LocalFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
-    uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
+    uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
+    bool use_file_handle_cache) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h
index 8d474e928..af15dfc80 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -31,7 +31,8 @@ class LocalFileReader : public FileReader {
 
   virtual Status Open(bool use_file_handle_cache) override;
   virtual Status ReadFromPos(DiskQueue* disk_queue, int64_t file_offset, uint8_t* buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
+      int64_t bytes_to_read, int64_t* bytes_read, bool* eof,
+      bool use_file_handle_cache) override;
   /// We don't cache files of the local file system.
   virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 1b355b999..ff3970e49 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -470,7 +470,8 @@ class ScanRange : public RequestRange {
   /// files. 'queue' is updated with the latencies and sizes of reads from the underlying
   /// filesystem.
   Status ReadSubRanges(
-      DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader* file_reader);
+      DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader* file_reader,
+      bool use_file_handle_cache);
 
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this. Need to take a lock on 'lock_' via
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1e33a75a6..1851f3a1b 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -235,9 +235,10 @@ ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue, int disk_id, bool use_lo
         read_status =
             file_reader->ReadFromPos(queue, offset_ + bytes_read_, buffer_desc->buffer_,
                 min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
-                &buffer_desc->len_, &eof);
+                &buffer_desc->len_, &eof, use_file_handle_cache);
       } else {
-        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader);
+        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader,
+            use_file_handle_cache);
       }
 
       COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
@@ -328,7 +329,8 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
 }
 
 Status ScanRange::ReadSubRanges(
-    DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader* file_reader) {
+    DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof, FileReader* file_reader,
+    bool use_file_handle_cache) {
   buffer_desc->len_ = 0;
   while (buffer_desc->len() < buffer_desc->buffer_len()
       && sub_range_pos_.index < sub_ranges_.size()) {
@@ -344,7 +346,7 @@ Status ScanRange::ReadSubRanges(
       int64_t current_bytes_read;
       Status read_status = file_reader->ReadFromPos(queue, offset,
           buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, &current_bytes_read,
-          eof);
+          eof, use_file_handle_cache);
       if (!read_status.ok()) return read_status;
       if (current_bytes_read != bytes_to_read) {
         DCHECK(*eof);
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 02c3cc021..939d5d9cd 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -130,7 +130,7 @@ class SkipIfNotHdfsMinicluster:
       not IS_HDFS or IS_EC or IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(),
       reason="Test is tuned for 3-node HDFS minicluster with no EC")
   scheduling = pytest.mark.skipif(
-      not IS_HDFS or IS_EC or pytest.config.option.testing_remote_cluster,
+      not (IS_HDFS or IS_OZONE) or IS_EC or pytest.config.option.testing_remote_cluster,
       reason="Test is tuned for scheduling decisions made on a 3-node HDFS minicluster "
              "with no EC")
 
diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py
index c14de17e7..c5c2555d8 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -21,7 +21,6 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
 
 
-@SkipIf.not_hdfs
 @SkipIf.is_buggy_el6_kernel
 @SkipIfNotHdfsMinicluster.scheduling
 class TestDataCache(CustomClusterTestSuite):
@@ -58,6 +57,7 @@ class TestDataCache(CustomClusterTestSuite):
     a single node to make it easier to verify the runtime profile. Also enables higher
     write concurrency and uses a single shard to avoid non-determinism.
     """
+    opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count'
     self.run_test_case('QueryTest/data-cache', vector, unique_database)
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-bytes') >= 0
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-dropped-entries') >= 0
@@ -71,6 +71,12 @@ class TestDataCache(CustomClusterTestSuite):
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-entries') > 0
     assert self.get_metric('impala-server.io-mgr.remote-data-cache-num-writes') > 0
 
+    # Expect all cache hits results in no opened files.
+    opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count'
+    baseline = self.get_metric(opened_file_handles_metric)
+    self.execute_query("select count(distinct l_orderkey) from test_parquet")
+    assert self.get_metric(opened_file_handles_metric) == baseline
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LRU"),
@@ -85,6 +91,13 @@ class TestDataCache(CustomClusterTestSuite):
   def test_data_cache_deterministic_lirs(self, vector, unique_database):
     self.__test_data_cache_deterministic(vector, unique_database)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=get_impalad_args("LRU") + " --max_cached_file_handles=0",
+      start_args=CACHE_START_ARGS, cluster_size=1)
+  def test_data_cache_deterministic_no_file_handle_cache(self, vector, unique_database):
+    self.__test_data_cache_deterministic(vector, unique_database)
+
   def __test_data_cache(self, vector):
     """ This test scans the same table twice and verifies the cache hit count metrics
     are correct. The exact number of bytes hit is non-deterministic between runs due