You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/08 23:58:09 UTC

(impala) branch master updated: IMPALA-12681: Release file descriptors for partially written temporary files

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 56a7514ba IMPALA-12681: Release file descriptors for partially written temporary files
56a7514ba is described below

commit 56a7514ba6e55f04d883067144b786c9cee7601a
Author: Yida Wu <yi...@cloudera.com>
AuthorDate: Wed Jan 3 19:29:49 2024 -0800

    IMPALA-12681: Release file descriptors for partially written temporary files
    
    This patch fixes a bug where partially written temporary files are
    removed without releasing the file descriptors. This patch fixes
    the bug by adding a call to Close() of the local file writer
    during the Delete() of the DiskFile class, which could be called
    when the local buffer file is being evicted or the query ends,
    ensuring proper release of the file handle.
    
    Testing:
    Passed core tests.
    Additionally, a check has been added in the test
    test_scratch_disk.py to verify that there are no deleted
    files in the /proc/x/fd/ directory.
    
    Change-Id: I58a2bac419ced806d6f5a32bcdf24d79e078ab14
    Reviewed-on: http://gerrit.cloudera.org:8080/20852
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/disk-file.cc            |  5 +++++
 be/src/runtime/io/disk-io-mgr-test.cc     |  1 +
 be/src/runtime/io/local-file-writer.cc    |  5 +++++
 be/src/runtime/io/local-file-writer.h     |  2 +-
 be/src/runtime/tmp-file-mgr-test.cc       |  5 +++--
 tests/custom_cluster/test_scratch_disk.py | 27 ++++++++++++++++++++++++++-
 6 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/io/disk-file.cc b/be/src/runtime/io/disk-file.cc
index 85a378653..b588c2e60 100644
--- a/be/src/runtime/io/disk-file.cc
+++ b/be/src/runtime/io/disk-file.cc
@@ -37,6 +37,11 @@ Status DiskFile::Delete(const unique_lock<shared_mutex>& lock) {
   // No support for remote file deletion yet.
   if (disk_type_ == DiskFileType::LOCAL_BUFFER || disk_type_ == DiskFileType::LOCAL) {
     if (is_deleted(status_l)) return DISK_FILE_DELETE_FAILED_INCORRECT_STATUS;
+    if (file_writer_ != nullptr) {
+      // Close the file writer to release the file handle.
+      RETURN_IF_ERROR(file_writer_->Close());
+    }
+    // Remove the local physical file if it exists.
     RETURN_IF_ERROR(FileSystemUtil::RemovePaths({path_}));
     SetStatusLocked(io::DiskFileStatus::DELETED, status_l);
   }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 09593f705..f6d88bcee 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -2220,6 +2220,7 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) {
   EXPECT_EQ(*(int32_t*)client_buffer.data(), *data);
   scan_range->ReturnBuffer(move(io_buffer));
 
+  ASSERT_TRUE((*new_tmp_file_obj)->Remove().ok());
   num_ranges_written_ = 0;
   tmp_file_grp->Close();
   io_mgr.UnregisterContext(io_ctx.get());
diff --git a/be/src/runtime/io/local-file-writer.cc b/be/src/runtime/io/local-file-writer.cc
index 0395fd34d..9c70d47d0 100644
--- a/be/src/runtime/io/local-file-writer.cc
+++ b/be/src/runtime/io/local-file-writer.cc
@@ -30,6 +30,11 @@
 namespace impala {
 namespace io {
 
+LocalFileWriter::~LocalFileWriter() {
+  // Ensure the file handle is released.
+  DCHECK(file_ == nullptr);
+}
+
 Status LocalFileWriter::Open() {
   lock_guard<mutex> lock(lock_);
   if (file_ != nullptr) return Status::OK();
diff --git a/be/src/runtime/io/local-file-writer.h b/be/src/runtime/io/local-file-writer.h
index 23bf66ca6..44f28f7d3 100644
--- a/be/src/runtime/io/local-file-writer.h
+++ b/be/src/runtime/io/local-file-writer.h
@@ -28,7 +28,7 @@ class LocalFileWriter : public FileWriter {
  public:
   LocalFileWriter(DiskIoMgr* io_mgr, const char* file_path, int64_t file_size = 0)
     : FileWriter(io_mgr, file_path, file_size) {}
-  ~LocalFileWriter() {}
+  ~LocalFileWriter();
 
   virtual Status Open() override;
   virtual Status Write(WriteRange* range, int64_t* written_bytes) override;
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 1ff4d300e..c00d0480b 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -1308,8 +1308,9 @@ void TmpFileMgrTest::TestCompressBufferManagement(
       }
       EXPECT_EQ(tmp_file->DiskFile()->GetFileStatus(), io::DiskFileStatus::PERSISTED);
       // Remove the local buffer to enforce reading from the remote file.
-      tmp_file->GetWriteFile()->SetStatus(io::DiskFileStatus::DELETED);
-      EXPECT_OK(FileSystemUtil::RemovePaths({tmp_file->GetWriteFile()->path()}));
+      unique_lock<shared_mutex> buffer_file_lock(
+          *(tmp_file->GetWriteFile()->GetFileLock()));
+      EXPECT_OK(tmp_file->GetWriteFile()->Delete(buffer_file_lock));
     };
     wait_upload_func(compressed_handle->file_);
     wait_upload_func(uncompressed_handle->file_);
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index db1c1c08c..a00fff04d 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -24,12 +24,13 @@ import pytest
 import re
 import shutil
 import stat
+import subprocess
 import tempfile
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.verifiers.metric_verifier import MetricVerifier
 from tests.common.skip import SkipIf
 from tests.util.hdfs_util import NAMENODE
+from tests.verifiers.metric_verifier import MetricVerifier
 
 class TestScratchDir(CustomClusterTestSuite):
   @classmethod
@@ -96,6 +97,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def teardown_method(self, method):
     for dir_path in self.created_dirs:
       shutil.rmtree(dir_path, ignore_errors=True)
+    self.check_deleted_file_fd()
 
   @pytest.mark.execute_serially
   def test_multiple_dirs(self, vector):
@@ -279,6 +281,29 @@ class TestScratchDir(CustomClusterTestSuite):
   def dfs_tmp_path(self):
     return "{}/tmp".format(NAMENODE)
 
+  def find_deleted_files_in_fd(self, pid):
+    fd_path = "/proc/{}/fd".format(pid)
+    command = "find {} -ls | grep '(deleted)'".format(fd_path)
+    try:
+      result = subprocess.check_output(command, shell=True)
+      return result.strip()
+    except subprocess.CalledProcessError as e:
+      if not e.output:
+        # If there is no output, return None.
+        return None
+      return "Error checking the fd path with error '{}'".format(e)
+
+  def check_deleted_file_fd(self):
+    # Check if we have deleted but still referenced files in fd.
+    # Regression test for IMPALA-12681.
+    pids = []
+    for impalad in self.cluster.impalads:
+      pids.append(impalad.get_pid())
+    assert pids
+    for pid in pids:
+      deleted_files = self.find_deleted_files_in_fd(pid)
+      assert deleted_files is None
+
   @pytest.mark.execute_serially
   @SkipIf.not_scratch_fs
   def test_scratch_dirs_remote_spill(self, vector):