You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/05 23:19:53 UTC

[1/2] impala git commit: IMPALA-6364: Bypass file handle cache for ineligible files

Repository: impala
Updated Branches:
  refs/heads/master d3ff67b8b -> d25607d01


IMPALA-6364: Bypass file handle cache for ineligible files

Currently, all HdfsFileHandles are owned and constructed
by the file handle cache. When the file handle cache
is disabled or the file handle is not eligible for
caching, the HdfsFileHandle is stored exclusively in
ScanRange::exclusive_hdfs_fh_, but the HdfsFileHandle still
comes from the file handle cache. It is created via a call to
DiskIoMgr::GetCachedHdfsFileHandle() with 'require_new_handle'
set to true and destroyed via
DiskIoMgr::ReleaseCachedHdfsFileHandle() with 'destroy_handle'
set to true.

Recent testing has revealed that the lock on the file handle
cache is a bottleneck for workloads with many small remote
files. There is no benefit to storing these exclusive file
handles in the file handle cache, as they do not participate
in the caching.

This change introduces DiskIoMgr::GetExclusiveHdfsFileHandle()
and DiskIoMgr::ReleaseExclusiveHdfsFileHandle(). These are
equivalent to the Get/ReleaseCachedHdfsFileHandle() calls, except
they bypass the file handle cache and create/destroy the
file handle directly. ScanRange::Open()/Close(), which
populates and frees ScanRange::exclusive_hdfs_fh_, now uses
these new calls rather than accessing the file handle cache.
This avoids the locking entirely, solving the bottleneck.

To draw a distinction between the two codepaths, HdfsFileHandle
is now an abstract class with two subclasses:
 - CachedHdfsFileHandles cover all handles that live in file handle
   cache. Get/ReleaseCachedHdfsFileHandle() use this subclass.
 - ExclusiveHdfsFileHandles cover all cases where a file handle
   does not come from the cache. The new
   Get/ReleaseExclusiveHdfsFileHandle() use this subclass.

Separately, testing revealed that increasing the number of
partitions for the file handle cache also fixes the contention
problem. This changes the file handle cache to make the number
of partitions configurable via startup parameter
num_file_handle_cache_partitions. This allows mitigation of
future bottlenecks without a patch.

Change-Id: I4ab52b0884a909a4faeb6692f32d45878ea2838f
Reviewed-on: http://gerrit.cloudera.org:8080/8945
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d1a0510b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d1a0510b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d1a0510b

Branch: refs/heads/master
Commit: d1a0510bfe0a168256d37904aca3a30994306454
Parents: d3ff67b
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Wed Jan 3 19:02:19 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 5 21:21:46 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/disk-io-mgr.cc        | 51 +++++++++++++++++++----
 be/src/runtime/io/disk-io-mgr.h         | 34 ++++++++--------
 be/src/runtime/io/handle-cache.h        | 44 ++++++++++++++------
 be/src/runtime/io/handle-cache.inline.h | 61 ++++++++++++++--------------
 be/src/runtime/io/request-ranges.h      | 17 ++++----
 be/src/runtime/io/scan-range.cc         | 18 ++++----
 6 files changed, 138 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 668fc75..4854bd6 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -116,6 +116,14 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 270, "Maximum time, in seconds, th
     "unused HDFS file handle will remain in the file handle cache. Disabled if set "
     "to 0.");
 
+// The file handle cache is split into multiple independent partitions, each with its
+// own lock and structures. A larger number of partitions reduces contention by
+// concurrent accesses, but it also reduces the efficiency of the cache due to
+// separate LRU lists.
+// TODO: Test different number of partitions to determine an appropriate default
+DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used by the "
+    "file handle cache.");
+
 // The IoMgr is able to run with a wide range of memory usage. If a query has memory
 // remaining less than this value, the IoMgr will stop all buffering regardless of the
 // current queue size.
@@ -224,6 +232,7 @@ DiskIoMgr::DiskIoMgr() :
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
         FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
   DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
@@ -251,6 +260,7 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
         FileSystemUtil::MaxNumFileHandles()),
+        FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
@@ -1166,15 +1176,36 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
   return disk_id % num_local_disks();
 }
 
-HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
-    std::string* fname, int64_t mtime, RequestContext *reader,
-    bool require_new) {
+ExclusiveHdfsFileHandle* DiskIoMgr::GetExclusiveHdfsFileHandle(const hdfsFS& fs,
+    std::string* fname, int64_t mtime, RequestContext *reader) {
+  ExclusiveHdfsFileHandle* fid = new ExclusiveHdfsFileHandle(fs, fname->data(), mtime);
+  if (!fid->ok()) {
+    VLOG_FILE << "Opening the file " << fname << " failed.";
+    delete fid;
+    return nullptr;
+  }
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+  // Every exclusive file handle is considered a cache miss
+  ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L);
+  ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L);
+  reader->cached_file_handles_miss_count_.Add(1L);
+  return fid;
+}
+
+void DiskIoMgr::ReleaseExclusiveHdfsFileHandle(ExclusiveHdfsFileHandle* fid) {
+  DCHECK(fid != nullptr);
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+  delete fid;
+}
+
+CachedHdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
+    std::string* fname, int64_t mtime, RequestContext *reader) {
   bool cache_hit;
-  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, require_new,
+  CachedHdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, false,
       &cache_hit);
   if (fh == nullptr) return nullptr;
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
   if (cache_hit) {
-    DCHECK(!require_new);
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
     ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
     reader->cached_file_handles_hit_count_.Add(1L);
@@ -1186,19 +1217,21 @@ HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
   return fh;
 }
 
-void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid,
-    bool destroy_handle) {
-  file_handle_cache_.ReleaseFileHandle(fname, fid, destroy_handle);
+void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname,
+    CachedHdfsFileHandle* fid) {
+  file_handle_cache_.ReleaseFileHandle(fname, fid, false);
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
 }
 
 Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname,
-    int64_t mtime, HdfsFileHandle** fid) {
+    int64_t mtime, CachedHdfsFileHandle** fid) {
   bool cache_hit;
   file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
   // The old handle has been destroyed, so *fid must be overwritten before returning.
   *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
       &cache_hit);
   if (*fid == nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
     return Status(TErrorCode::DISK_IO_ERROR,
         GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 71dc840..4fa078b 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -338,25 +338,28 @@ class DiskIoMgr : public CacheLineAligned {
   /// for debugging.
   bool Validate() const;
 
-  /// Given a FS handle, name and last modified time of the file, gets an HdfsFileHandle
-  /// from the file handle cache. If 'require_new_handle' is true, the cache will open
-  /// a fresh file handle. On success, records statistics about whether this was
-  /// a cache hit or miss in the 'reader' as well as at the system level. In case of an
-  /// error returns nullptr.
-  HdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
-      std::string* fname, int64_t mtime, RequestContext *reader,
-      bool require_new_handle);
+  /// Given a FS handle, name and last modified time of the file, construct a new
+  /// ExclusiveHdfsFileHandle. In the case of an error, returns nullptr.
+  ExclusiveHdfsFileHandle* GetExclusiveHdfsFileHandle(const hdfsFS& fs,
+      std::string* fname, int64_t mtime, RequestContext* reader);
+
+  /// Releases an exclusive file handle, destroying it
+  void ReleaseExclusiveHdfsFileHandle(ExclusiveHdfsFileHandle* fid);
+
+  /// Given a FS handle, name and last modified time of the file, gets a
+  /// CachedHdfsFileHandle from the file handle cache. On success, records statistics
+  /// about whether this was a cache hit or miss in the 'reader' as well as at the
+  /// system level. In case of an error returns nullptr.
+  CachedHdfsFileHandle* GetCachedHdfsFileHandle(const hdfsFS& fs,
+      std::string* fname, int64_t mtime, RequestContext* reader);
 
   /// Releases a file handle back to the file handle cache when it is no longer in use.
-  /// If 'destroy_handle' is true, the file handle cache will close the file handle
-  /// immediately.
-  void ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid,
-      bool destroy_handle);
+  void ReleaseCachedHdfsFileHandle(std::string* fname, CachedHdfsFileHandle* fid);
 
   /// Reopens a file handle by destroying the file handle and getting a fresh
   /// file handle from the cache. Returns an error if the file could not be reopened.
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
-      HdfsFileHandle** fid);
+      CachedHdfsFileHandle** fid);
 
   /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if
   /// 'bytes_to_free' is -1.
@@ -470,13 +473,10 @@ class DiskIoMgr : public CacheLineAligned {
   /// round-robin assignment for that case.
   static AtomicInt32 next_disk_id_;
 
-  // Number of file handle cache partitions to use
-  static const size_t NUM_FILE_HANDLE_CACHE_PARTITIONS = 16;
-
   // Caching structure that maps file names to cached file handles. The cache has an upper
   // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file
   // handles are closed.
-  FileHandleCache<NUM_FILE_HANDLE_CACHE_PARTITIONS> file_handle_cache_;
+  FileHandleCache file_handle_cache_;
 
   /// Returns the index into free_buffers_ for a given buffer size
   int free_buffers_idx(int64_t buffer_size);

http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/handle-cache.h b/be/src/runtime/io/handle-cache.h
index 78f91cd..232b630 100644
--- a/be/src/runtime/io/handle-cache.h
+++ b/be/src/runtime/io/handle-cache.h
@@ -35,16 +35,15 @@
 namespace impala {
 namespace io {
 
-/// This class is a small wrapper around the hdfsFile handle and the file system
+/// This abstract class is a small wrapper around the hdfsFile handle and the file system
 /// instance which is needed to close the file handle. The handle incorporates
 /// the last modified time of the file when it was opened. This is used to distinguish
 /// between file handles for files that can be updated or overwritten.
+/// This is used only through its subclasses, CachedHdfsFileHandle and
+/// ExclusiveHdfsFileHandle.
 class HdfsFileHandle {
  public:
 
-  /// Constructor will open the file
-  HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
-
   /// Destructor will close the file handle
   ~HdfsFileHandle();
 
@@ -52,12 +51,32 @@ class HdfsFileHandle {
   int64_t mtime() const { return mtime_; }
   bool ok() const { return hdfs_file_ != nullptr; }
 
+ protected:
+  /// Constructor will open the file
+  HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
+
  private:
   hdfsFS fs_;
   hdfsFile hdfs_file_;
   int64_t mtime_;
 };
 
+/// CachedHdfsFileHandles are owned by the file handle cache and are used for no
+/// other purpose.
+class CachedHdfsFileHandle : public HdfsFileHandle {
+ public:
+  CachedHdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
+  ~CachedHdfsFileHandle();
+};
+
+/// ExclusiveHdfsFileHandles are used for all purposes where a CachedHdfsFileHandle
+/// is not appropriate.
+class ExclusiveHdfsFileHandle : public HdfsFileHandle {
+ public:
+  ExclusiveHdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime)
+    : HdfsFileHandle(fs, fname, mtime) {}
+};
+
 /// The FileHandleCache is a data structure that owns HdfsFileHandles to share between
 /// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions.
 /// Each partition operates independently with its own locks, reducing contention
@@ -84,14 +103,14 @@ class HdfsFileHandle {
 ///
 /// TODO: The cache should also evict file handles more aggressively if the file handle's
 /// mtime is older than the file's current mtime.
-template <size_t NUM_PARTITIONS>
 class FileHandleCache {
  public:
   /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
   /// partitions. If the capacity does not split evenly, then the capacity is rounded
   /// up. The cache will age out any file handle that is unused for
   /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero.
-  FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs);
+  FileHandleCache(size_t capacity, size_t num_partitions,
+      uint64_t unused_handle_timeout_secs);
 
   /// Destructor is only called for backend tests
   ~FileHandleCache();
@@ -113,14 +132,15 @@ class FileHandleCache {
   ///
   /// This obtains exclusive control over the returned file handle. It must be paired
   /// with a call to ReleaseFileHandle to release exclusive control.
-  HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
-      bool require_new_handle, bool* cache_hit);
+  CachedHdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname,
+      int64_t mtime, bool require_new_handle, bool* cache_hit);
 
   /// Release the exclusive hold on the specified file handle (which was obtained
   /// by calling GetFileHandle). The cache may evict a file handle if the cache is
   /// above capacity. If 'destroy_handle' is true, immediately remove this handle
   /// from the cache.
-  void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle);
+  void ReleaseFileHandle(std::string* fname, CachedHdfsFileHandle* fh,
+      bool destroy_handle);
 
  private:
   struct FileHandleEntry;
@@ -134,9 +154,9 @@ class FileHandleCache {
   typedef std::list<LruListEntry> LruListType;
 
   struct FileHandleEntry {
-    FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list)
+    FileHandleEntry(CachedHdfsFileHandle* fh_in, LruListType& lru_list)
     : fh(fh_in), lru_entry(lru_list.end()) {}
-    std::unique_ptr<HdfsFileHandle> fh;
+    std::unique_ptr<CachedHdfsFileHandle> fh;
 
     /// in_use is true for a file handle checked out via GetFileHandle() that has not
     /// been returned via ReleaseFileHandle().
@@ -180,7 +200,7 @@ class FileHandleCache {
   /// enforce the capacity.
   void EvictHandles(FileHandleCachePartition& p);
 
-  std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
+  std::vector<FileHandleCachePartition> cache_partitions_;
 
   /// Maximum time before an unused file handle is aged out of the cache.
   /// Aging out is disabled if this is set to 0.

http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/handle-cache.inline.h b/be/src/runtime/io/handle-cache.inline.h
index 10db49e..f0cced0 100644
--- a/be/src/runtime/io/handle-cache.inline.h
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -30,13 +30,11 @@ namespace io {
 HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname,
     int64_t mtime)
     : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) {
-  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
   VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
 }
 
 HdfsFileHandle::~HdfsFileHandle() {
   if (hdfs_file_ != nullptr && fs_ != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
     VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
     hdfsCloseFile(fs_, hdfs_file_);
   }
@@ -44,13 +42,23 @@ HdfsFileHandle::~HdfsFileHandle() {
   hdfs_file_ = nullptr;
 }
 
-template <size_t NUM_PARTITIONS>
-  FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity,
-      uint64_t unused_handle_timeout_secs)
-  : unused_handle_timeout_secs_(unused_handle_timeout_secs) {
-  DCHECK_GT(NUM_PARTITIONS, 0);
-  size_t remainder = capacity % NUM_PARTITIONS;
-  size_t base_capacity = capacity / NUM_PARTITIONS;
+CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const char* fname,
+    int64_t mtime)
+  : HdfsFileHandle(fs, fname, mtime) {
+  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
+}
+
+CachedHdfsFileHandle::~CachedHdfsFileHandle() {
+  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
+}
+
+FileHandleCache::FileHandleCache(size_t capacity,
+      size_t num_partitions, uint64_t unused_handle_timeout_secs)
+  : cache_partitions_(num_partitions),
+  unused_handle_timeout_secs_(unused_handle_timeout_secs) {
+  DCHECK_GT(num_partitions, 0);
+  size_t remainder = capacity % num_partitions;
+  size_t base_capacity = capacity / num_partitions;
   size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
   for (FileHandleCachePartition& p : cache_partitions_) {
     p.size = 0;
@@ -58,29 +66,25 @@ template <size_t NUM_PARTITIONS>
   }
 }
 
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry(
+FileHandleCache::LruListEntry::LruListEntry(
     typename MapType::iterator map_entry_in)
      : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {}
 
-template <size_t NUM_PARTITIONS>
-FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
+FileHandleCache::~FileHandleCache() {
   shut_down_promise_.Set(true);
   if (eviction_thread_ != nullptr) eviction_thread_->Join();
 }
 
-template <size_t NUM_PARTITIONS>
-Status FileHandleCache<NUM_PARTITIONS>::Init() {
+Status FileHandleCache::Init() {
   return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
-      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_);
+      &FileHandleCache::EvictHandlesLoop, this, &eviction_thread_);
 }
 
-template <size_t NUM_PARTITIONS>
-HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
+CachedHdfsFileHandle* FileHandleCache::GetFileHandle(
     const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
     bool* cache_hit) {
   // Hash the key and get appropriate partition
-  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % cache_partitions_.size();
   FileHandleCachePartition& p = cache_partitions_[index];
   boost::lock_guard<SpinLock> g(p.lock);
   pair<typename MapType::iterator, typename MapType::iterator> range =
@@ -112,7 +116,7 @@ HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
   if (!ret_elem) {
     *cache_hit = false;
     // Create a new entry and move it into the map
-    HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
+    CachedHdfsFileHandle* new_fh = new CachedHdfsFileHandle(fs, fname->data(), mtime);
     if (!new_fh->ok()) {
       delete new_fh;
       return nullptr;
@@ -128,16 +132,14 @@ HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
   DCHECK(ret_elem->fh.get() != nullptr);
   DCHECK(!ret_elem->in_use);
   ret_elem->in_use = true;
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
   return ret_elem->fh.get();
 }
 
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
-    HdfsFileHandle* fh, bool destroy_handle) {
+void FileHandleCache::ReleaseFileHandle(std::string* fname,
+    CachedHdfsFileHandle* fh, bool destroy_handle) {
   DCHECK(fh != nullptr);
   // Hash the key and get appropriate partition
-  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % cache_partitions_.size();
   FileHandleCachePartition& p = cache_partitions_[index];
   boost::lock_guard<SpinLock> g(p.lock);
   pair<typename MapType::iterator, typename MapType::iterator> range =
@@ -157,7 +159,6 @@ void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
   FileHandleEntry* release_elem = &release_it->second;
   DCHECK(release_elem->in_use);
   release_elem->in_use = false;
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
   if (destroy_handle) {
     --p.size;
     p.cache.erase(release_it);
@@ -186,8 +187,7 @@ void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
   }
 }
 
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
+void FileHandleCache::EvictHandlesLoop() {
   while (true) {
     for (FileHandleCachePartition& p : cache_partitions_) {
       boost::lock_guard<SpinLock> g(p.lock);
@@ -203,9 +203,8 @@ void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() {
   DCHECK(shut_down_promise_.Get());
 }
 
-template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
-    FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
+void FileHandleCache::EvictHandles(
+    FileHandleCache::FileHandleCachePartition& p) {
   uint64_t now = MonotonicSeconds();
   uint64_t oldest_allowed_timestamp =
       now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index c1b3bbe..222f847 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -34,7 +34,7 @@ class MemTracker;
 namespace io {
 class DiskIoMgr;
 class RequestContext;
-class HdfsFileHandle;
+class ExclusiveHdfsFileHandle;
 class ScanRange;
 
 /// Buffer struct that is used by the caller and IoMgr to pass read buffers.
@@ -279,10 +279,9 @@ class ScanRange : public RequestRange {
   /// range will not maintain an exclusive file handle. It will borrow an hdfs file
   /// handle from the file handle cache for each Read(), so Open() does nothing.
   /// If 'use_file_handle_cache' is false or this is a remote hdfs file or this is
-  /// a local OS file, Open() will maintain a file handle on the scan range for
-  /// exclusive use by this scan range. An exclusive hdfs file handle still comes
-  /// from the cache, but it is a newly opened file handle that is held for the
-  /// entire duration of a scan range's lifetime and destroyed in Close().
+  /// a local OS file, Open() will open a file handle on the scan range for
+  /// exclusive use by this scan range. The scan range is the exclusive owner of the
+  /// file handle, and the file handle is destroyed in Close().
   /// All local OS files are opened using normal OS file APIs.
   Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
 
@@ -339,16 +338,16 @@ class ScanRange : public RequestRange {
   RequestContext* reader_ = nullptr;
 
   /// File handle either to hdfs or local fs (FILE*)
-  /// The hdfs file handle is only stored here in three cases:
+  /// The hdfs file handle is stored here in three cases:
   /// 1. The file handle cache is off (max_cached_file_handles == 0).
   /// 2. The scan range is using hdfs caching.
   /// -OR-
   /// 3. The hdfs file is expected to be remote (expected_local_ == false)
-  /// In each case, the scan range gets a new file handle from the file handle cache
-  /// at Open(), holds it exclusively, and destroys it in Close().
+  /// In each case, the scan range gets a new ExclusiveHdfsFileHandle at Open(),
+  /// owns it exclusively, and destroys it in Close().
   union {
     FILE* local_file_ = nullptr;
-    HdfsFileHandle* exclusive_hdfs_fh_;
+    ExclusiveHdfsFileHandle* exclusive_hdfs_fh_;
   };
 
   /// Tagged union that holds a buffer for the cases when there is a buffer allocated

http://git-wip-us.apache.org/repos/asf/impala/blob/d1a0510b/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index b7655a8..dc14050 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -272,16 +272,16 @@ Status ScanRange::Open(bool use_file_handle_cache) {
     // for each scan range.
     if (use_file_handle_cache && expected_local_) return Status::OK();
     // Get a new exclusive file handle.
-    exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-        mtime(), reader_, true);
+    exclusive_hdfs_fh_ = io_mgr_->GetExclusiveHdfsFileHandle(fs_, file_string(),
+        mtime(), reader_);
     if (exclusive_hdfs_fh_ == nullptr) {
       return Status(TErrorCode::DISK_IO_ERROR,
           GetHdfsErrorMsg("Failed to open HDFS file ", file_));
     }
 
     if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
-      // Destroy the file handle and remove it from the cache.
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
+      // Destroy the file handle
+      io_mgr_->ReleaseExclusiveHdfsFileHandle(exclusive_hdfs_fh_);
       exclusive_hdfs_fh_ = nullptr;
       return Status(TErrorCode::DISK_IO_ERROR,
           Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
@@ -321,8 +321,8 @@ void ScanRange::Close() {
         external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
       }
 
-      // Destroy the file handle and remove it from the cache.
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
+      // Destroy the file handle.
+      io_mgr_->ReleaseExclusiveHdfsFileHandle(exclusive_hdfs_fh_);
       exclusive_hdfs_fh_ = nullptr;
       closed_file = true;
     }
@@ -395,7 +395,7 @@ Status ScanRange::Read(
   DCHECK_GE(bytes_to_read, 0);
 
   if (fs_ != nullptr) {
-    HdfsFileHandle* borrowed_hdfs_fh = nullptr;
+    CachedHdfsFileHandle* borrowed_hdfs_fh = nullptr;
     hdfsFile hdfs_file;
 
     // If the scan range has an exclusive file handle, use it. Otherwise, borrow
@@ -404,7 +404,7 @@ Status ScanRange::Read(
       hdfs_file = exclusive_hdfs_fh_->file();
     } else {
       borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-          mtime(), reader_, false);
+          mtime(), reader_);
       if (borrowed_hdfs_fh == nullptr) {
         return Status(TErrorCode::DISK_IO_ERROR,
             GetHdfsErrorMsg("Failed to open HDFS file ", file_));
@@ -484,7 +484,7 @@ Status ScanRange::Read(
     }
 
     if (borrowed_hdfs_fh != nullptr) {
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, false);
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh);
     }
     if (!status.ok()) return status;
   } else {


[2/2] impala git commit: IMPALA-6362: avoid Reservation/MemTracker deadlock

Posted by ta...@apache.org.
IMPALA-6362: avoid Reservation/MemTracker deadlock

Avoid the circular dependency between ReservationTracker::lock_ and
MemTracker::child_trackers_lock_ by not acquiring
ReservationTracker::lock_ in GetReservation(), where an atomic
operation is sufficient.

Testing:
Added a unit test that reproed the deadlock.

Change-Id: Id7adbe961a925075422c685690dd3d1609779ced
Reviewed-on: http://gerrit.cloudera.org:8080/8933
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d25607d0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d25607d0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d25607d0

Branch: refs/heads/master
Commit: d25607d01b90abe6fdb7a422278688876fa54594
Parents: d1a0510
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 3 15:38:21 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 5 22:57:21 2018 +0000

----------------------------------------------------------------------
 .../bufferpool/reservation-tracker-test.cc      | 44 ++++++++++++++++++++
 .../runtime/bufferpool/reservation-tracker.cc   | 25 ++++++-----
 be/src/runtime/bufferpool/reservation-tracker.h | 15 ++++---
 be/src/util/memory-metrics.cc                   |  4 ++
 be/src/util/memory-metrics.h                    |  2 +
 5 files changed, 75 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d25607d0/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index b6717f4..c46c5ea 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -26,6 +26,7 @@
 #include "common/init.h"
 #include "common/object-pool.h"
 #include "runtime/mem-tracker.h"
+#include "util/memory-metrics.h"
 #include "testutil/gtest-util.h"
 
 #include "common/names.h"
@@ -55,6 +56,12 @@ class ReservationTrackerTest : public ::testing::Test {
     return RuntimeProfile::Create(&obj_pool_, "test profile");
   }
 
+  BufferPoolMetric* CreateReservationMetric(ReservationTracker* tracker) {
+    return obj_pool_.Add(new BufferPoolMetric(MetricDefs::Get("buffer-pool.reserved"),
+          BufferPoolMetric::BufferPoolMetricType::RESERVED, tracker,
+          nullptr));
+  }
+
   ObjectPool obj_pool_;
 
   ReservationTracker root_;
@@ -533,6 +540,43 @@ TEST_F(ReservationTrackerTest, ReservationUtil) {
       ReservationUtil::GetMinMemLimitFromReservation(4 * GIG)));
 }
 
+static void LogUsageThread(MemTracker* mem_tracker, AtomicInt32* done) {
+  while (done->Load() == 0) {
+    int64_t logged_consumption;
+    mem_tracker->LogUsage(10, "  ", &logged_consumption);
+  }
+}
+
+// IMPALA-6362: regression test for deadlock between ReservationTracker and MemTracker.
+TEST_F(ReservationTrackerTest, MemTrackerDeadlock) {
+  const int64_t RESERVATION_LIMIT = 1024;
+  root_.InitRootTracker(nullptr, numeric_limits<int64_t>::max());
+  MemTracker* mem_tracker = obj_pool_.Add(new MemTracker);
+  ReservationTracker* reservation = obj_pool_.Add(new ReservationTracker());
+  reservation->InitChildTracker(nullptr, &root_, mem_tracker, RESERVATION_LIMIT);
+
+  // Create a child MemTracker with a buffer pool consumption metric, that calls
+  // reservation->GetReservation() when its usage is logged.
+  obj_pool_.Add(new MemTracker(CreateReservationMetric(reservation),
+        -1, "Reservation", mem_tracker));
+  // Start background thread that repeatededly logs the 'mem_tracker' tree.
+  AtomicInt32 done(0);
+  thread log_usage_thread(&LogUsageThread, mem_tracker, &done);
+
+  // Retry enough times to reproduce the deadlock with LogUsageThread().
+  for (int i = 0; i < 100; ++i) {
+    // Fail to increase reservation, hitting limit of 'reservation'. This will try
+    // to log the 'mem_tracker' tree while holding reservation->lock_.
+    Status err;
+    ASSERT_FALSE(reservation->IncreaseReservation(RESERVATION_LIMIT + 1, &err));
+    ASSERT_FALSE(err.ok());
+  }
+
+  done.Store(1);
+  log_usage_thread.join();
+  reservation->Close();
+  mem_tracker->Close();
+}
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/impala/blob/d25607d0/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index 1c84912..aba5dce 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -21,6 +21,7 @@
 #include <cstdlib>
 
 #include "common/object-pool.h"
+#include "gutil/atomicops.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
@@ -189,10 +190,12 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
       }
       *error_status = Status::Expected(error_msg);
     }
+  } else if (parent_ == nullptr) {
+    // No parent and no linked MemTracker - increase can be granted.
+    DCHECK(mem_tracker_ == nullptr) << "Root cannot have linked MemTracker";
+    granted = true;
   } else {
-    if (parent_ == nullptr) {
-      granted = true;
-    } else {
+    {
       lock_guard<SpinLock> l(parent_->lock_);
       granted = parent_->IncreaseReservationInternalLocked(
           reservation_increase, true, true, error_status);
@@ -208,7 +211,6 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
       parent_->DecreaseReservation(reservation_increase, true);
     }
   }
-
   if (granted) {
     // The reservation was granted and state updated in all ancestors: we can modify
     // this tracker's state now.
@@ -374,15 +376,17 @@ void ReservationTracker::ReleaseTo(int64_t bytes) {
 }
 
 int64_t ReservationTracker::GetReservation() {
-  lock_guard<SpinLock> l(lock_);
+  // Don't acquire lock - there is no point in holding it for this function only since
+  // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return reservation_;
+  return base::subtle::Acquire_Load(&reservation_);
 }
 
 int64_t ReservationTracker::GetUsedReservation() {
-  lock_guard<SpinLock> l(lock_);
+  // Don't acquire lock - there is no point in holding it for this function only since
+  // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return used_reservation_;
+  return base::subtle::Acquire_Load(&used_reservation_);
 }
 
 int64_t ReservationTracker::GetUnusedReservation() {
@@ -392,9 +396,10 @@ int64_t ReservationTracker::GetUnusedReservation() {
 }
 
 int64_t ReservationTracker::GetChildReservations() {
-  lock_guard<SpinLock> l(lock_);
+  // Don't acquire lock - there is no point in holding it for this function only since
+  // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return child_reservations_;
+  return base::subtle::Acquire_Load(&child_reservations_);
 }
 
 void ReservationTracker::CheckConsistency() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/d25607d0/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 337e12b..ff4b77e 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -154,18 +154,19 @@ class ReservationTracker {
   /// 'bytes' before calling this method.
   void ReleaseTo(int64_t bytes);
 
-  /// Returns the amount of the reservation in bytes.
+  /// Returns the amount of the reservation in bytes. Does not acquire the internal lock.
   int64_t GetReservation();
 
   /// Returns the current amount of the reservation used at this tracker, not including
-  /// reservations of children in bytes.
+  /// reservations of children in bytes. Does not acquire the internal lock.
   int64_t GetUsedReservation();
 
   /// Returns the amount of the reservation neither used nor given to childrens'
-  /// reservations at this tracker in bytes.
+  /// reservations at this tracker in bytes. Acquires the internal lock.
   int64_t GetUnusedReservation();
 
-  /// Returns the total reservations of children in bytes.
+  /// Returns the total reservations of children in bytes. Does not acquire the
+  /// internal lock.
   int64_t GetChildReservations();
 
   /// Support for debug actions: deny reservation increase with probability 'probability'.
@@ -292,18 +293,22 @@ class ReservationTracker {
   /// TODO: remove once all memory is accounted via ReservationTrackers.
   MemTracker* mem_tracker_ = nullptr;
 
-  /// The maximum reservation in bytes that this tracker can have.
+  /// The maximum reservation in bytes that this tracker can have. Can be read with an
+  /// atomic load without holding lock.
   int64_t reservation_limit_;
 
   /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
+  /// Can be read with an atomic load without holding lock.
   int64_t reservation_;
 
   /// Total reservation of children in bytes. This is included in 'reservation_'.
   /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  /// Can be read with an atomic load without holding lock.
   int64_t child_reservations_;
 
   /// The amount of the reservation currently used by this tracker in bytes.
   /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  /// Can be read with an atomic load without holding lock.
   int64_t used_reservation_;
 };
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d25607d0/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index d1e7a16..3308bf4 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -264,6 +264,10 @@ BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType t
     buffer_pool_(buffer_pool) {}
 
 void BufferPoolMetric::CalculateValue() {
+  // IMPALA-6362: we have to be careful that none of the below calls to ReservationTracker
+  // methods acquire ReservationTracker::lock_ to avoid a potential circular dependency
+  // with MemTracker::child_trackers_lock_, which may be held when refreshing MemTracker
+  // consumption.
   switch (type_) {
     case BufferPoolMetricType::LIMIT:
       value_ = buffer_pool_->GetSystemBytesLimit();

http://git-wip-us.apache.org/repos/asf/impala/blob/d25607d0/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 5491f8c..3294c30 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -210,6 +210,8 @@ class BufferPoolMetric : public IntGauge {
   virtual void CalculateValue();
 
  private:
+  friend class ReservationTrackerTest;
+
   enum class BufferPoolMetricType {
     LIMIT, // Limit on memory allocated to buffers.
     // Total amount of buffer memory allocated from the system. Always <= LIMIT.