You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/05/27 05:34:32 UTC

[02/11] incubator-impala git commit: IMPALA-4623: Enable file handle cache

IMPALA-4623: Enable file handle cache

Currently, every scan range maintains a file handle, even
when multiple scan ranges are accessing the same file.
Opening the file handles causes load on the NameNode, which
can lead to scaling issues.

There are two parts to this transaction:
1. Enable file handle caching by default for local files
2. Share the file handle between scan ranges from the same
file

Local scan ranges no longer maintain their own Hdfs file
handles. On each read, the io thread will get the Hdfs file
handle from the cache (opening it if necessary) and use
that for the read. This allows multiple scan ranges on the
same file to use the same file handle. Since the file
offsets are no longer consistent for an individual scan
range, all Hdfs reads need to either use hdfsPread or do
a seek before reading. Additionally, since Hdfs read
statistics are maintained on the file handle, the read
statistics must be retrieved and cleared after each read.

To manage contention, the file handle cache is now
partitioned by a hash of the key into independent
caches with independent locks. The allowed capacity
of the file handle cache is split evenly among the
partitions. File handles are evicted independently
for each partition. The file handle cache maintains
ownership of the file handles at all times, but it
will not evict a file handle that is in use.

If max_cached_file_handles is set to 0 or the the
scan range is accessing data cached by Hdfs or the
scan range is remote, the scan range will get a
file handle from the cache and hold it until the
scan range is closed. This mimics the existing behavior,
except the file handle stays in the cache and is owned
by the cache. Since it is in use, it will not be evicted.

If a file handle in the cache becomes invalid,
it may result in Read() calls failing. Consequently,
if Read() encounters an error using a file handle
from the cache, it will destroy the handle and
retry once with a new file handle. Any subsequent
error is unrelated to the file handle cache and
will be returned.

Tests:
query_test/test_hdfs_fd_caching.py copies the files from
an existing table into a new directory and uses that to
create an external table. It queries the external table,
then uses the hdfs commandline to manipulate the hdfs file
(delete, move, etc). It queries again to make sure we
don't crash. Then, it runs "invalidate metadata". It
checks the row counts before the modification and after
"invalidate metadata", but it does not check the results
in between.
custom_cluster/test_hdfs_fd_caching.py starts up a cluster
with a small file handle cache size. It verifies that a
file handle can be reused (i.e. rerunning a query does
not result in more file handles cached). It also verifies
that the cache capacity is enforced.

Change-Id: Ibe5ff60971dd653c3b6a0e13928cfa9fc59d078d
Reviewed-on: http://gerrit.cloudera.org:8080/6478
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/effe973a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/effe973a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/effe973a

Branch: refs/heads/master
Commit: effe973a58ba2147e175aa19aeefc2be1ea840cb
Parents: e89d705
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Mar 23 18:26:51 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat May 27 03:04:38 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc              |  23 +-
 be/src/exec/hdfs-scan-node-base.h               |  32 +-
 be/src/runtime/buffered-block-mgr.h             |   2 +
 be/src/runtime/disk-io-mgr-handle-cache.h       | 163 ++++++++++
 .../runtime/disk-io-mgr-handle-cache.inline.h   | 165 ++++++++++
 be/src/runtime/disk-io-mgr-internal.h           |   6 +
 be/src/runtime/disk-io-mgr-reader-context.cc    |   2 +
 be/src/runtime/disk-io-mgr-scan-range.cc        | 315 ++++++++++++-------
 be/src/runtime/disk-io-mgr.cc                   | 284 +++++++----------
 be/src/runtime/disk-io-mgr.h                    | 141 +++++----
 tests/custom_cluster/test_hdfs_fd_caching.py    |  32 +-
 tests/metadata/test_refresh_partition.py        |   6 +-
 tests/query_test/test_hdfs_fd_caching.py        | 126 +++++---
 13 files changed, 872 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index eee5fbc..89fc469 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -82,25 +82,12 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
                            const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
-      runtime_state_(NULL),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
-      min_max_tuple_desc_(nullptr),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
-      reader_context_(NULL),
-      tuple_desc_(NULL),
-      hdfs_table_(NULL),
-      initial_ranges_issued_(false),
-      counters_running_(false),
-      max_compressed_text_file_length_(NULL),
-      disks_accessed_bitmap_(TUnit::UNIT, 0),
-      bytes_read_local_(NULL),
-      bytes_read_short_circuit_(NULL),
-      bytes_read_dn_cache_(NULL),
-      num_remote_ranges_(NULL),
-      unexpected_remote_bytes_(NULL) {
+      disks_accessed_bitmap_(TUnit::UNIT, 0){
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -439,6 +426,10 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
       TUnit::UNIT);
   unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected",
       TUnit::BYTES);
+  cached_file_handles_hit_count_ = ADD_COUNTER(runtime_profile(),
+      "CachedFileHandlesHitCount", TUnit::UNIT);
+  cached_file_handles_miss_count_ = ADD_COUNTER(runtime_profile(),
+      "CachedFileHandlesMissCount", TUnit::UNIT);
 
   max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter(
       "MaxCompressedTextFileLength", TUnit::BYTES);
@@ -882,6 +873,10 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
         runtime_state_->io_mgr()->num_remote_ranges(reader_context_)));
     unexpected_remote_bytes_->Set(
         runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_));
+    cached_file_handles_hit_count_->Set(
+        runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_));
+    cached_file_handles_miss_count_->Set(
+        runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_));
 
     if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 02565ff..abe0fc6 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -299,7 +299,7 @@ class HdfsScanNodeBase : public ScanNode {
   friend class ScannerContext;
   friend class HdfsScanner;
 
-  RuntimeState* runtime_state_;
+  RuntimeState* runtime_state_ = nullptr;
 
   /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
   const int min_max_tuple_id_;
@@ -308,7 +308,7 @@ class HdfsScanNodeBase : public ScanNode {
   vector<ExprContext*> min_max_conjunct_ctxs_;
 
   /// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics.
-  TupleDescriptor* min_max_tuple_desc_;
+  TupleDescriptor* min_max_tuple_desc_ = nullptr;
 
   // Number of header lines to skip at the beginning of each file of this table. Only set
   // to values > 0 for hdfs text files.
@@ -318,10 +318,10 @@ class HdfsScanNodeBase : public ScanNode {
   const int tuple_id_;
 
   /// RequestContext object to use with the disk-io-mgr for reads.
-  DiskIoRequestContext* reader_context_;
+  DiskIoRequestContext* reader_context_ = nullptr;
 
   /// Descriptor for tuples this scan node constructs
-  const TupleDescriptor* tuple_desc_;
+  const TupleDescriptor* tuple_desc_ = nullptr;
 
   /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only
   /// the partition columns for that partition materialized. Used to filter files and scan
@@ -330,7 +330,7 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Descriptor for the hdfs table, including partition and format metadata.
   /// Set in Prepare, owned by RuntimeState
-  const HdfsTableDescriptor* hdfs_table_;
+  const HdfsTableDescriptor* hdfs_table_ = nullptr;
 
   /// The root of the table's Avro schema, if we're scanning an Avro table.
   ScopedAvroSchemaElement avro_schema_;
@@ -361,7 +361,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
   /// the first call to GetNext(). The token manager, in a different thread, will read
   /// this variable.
-  bool initial_ranges_issued_;
+  bool initial_ranges_issued_ = false;
 
   /// Number of files that have not been issued from the scanners.
   AtomicInt32 num_unqueued_files_;
@@ -409,29 +409,35 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// If true, counters are actively running and need to be reported in the runtime
   /// profile.
-  bool counters_running_;
+  bool counters_running_ = false;
 
   /// The size of the largest compressed text file to be scanned. This is used to
   /// estimate scanner thread memory usage.
-  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_;
+  RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_ = nullptr;
 
   /// Disk accessed bitmap
   RuntimeProfile::Counter disks_accessed_bitmap_;
 
   /// Total number of bytes read locally
-  RuntimeProfile::Counter* bytes_read_local_;
+  RuntimeProfile::Counter* bytes_read_local_ = nullptr;
 
   /// Total number of bytes read via short circuit read
-  RuntimeProfile::Counter* bytes_read_short_circuit_;
+  RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr;
 
   /// Total number of bytes read from data node cache
-  RuntimeProfile::Counter* bytes_read_dn_cache_;
+  RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr;
 
   /// Total number of remote scan ranges
-  RuntimeProfile::Counter* num_remote_ranges_;
+  RuntimeProfile::Counter* num_remote_ranges_ = nullptr;
 
   /// Total number of bytes read remotely that were expected to be local
-  RuntimeProfile::Counter* unexpected_remote_bytes_;
+  RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr;
+
+  /// Total number of file handle opens where the file handle was present in the cache
+  RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr;
+
+  /// Total number of file handle opens where the file handle was not in the cache
+  RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr;
 
   /// Pool for allocating some amounts of memory that is shared between scanners.
   /// e.g. partition key tuple and their string buffers

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index bbe8429..ab05329 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -22,6 +22,8 @@
 #include "runtime/tmp-file-mgr.h"
 #include "util/mem-range.h"
 
+#include <boost/unordered_map.hpp>
+
 namespace impala {
 
 class RuntimeState;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h
new file mode 100644
index 0000000..ddfb934
--- /dev/null
+++ b/be/src/runtime/disk-io-mgr-handle-cache.h
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H
+
+#include <array>
+#include <list>
+#include <map>
+#include <memory>
+
+#include <boost/thread/mutex.hpp>
+
+#include "common/hdfs.h"
+#include "util/aligned-new.h"
+#include "util/impalad-metrics.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+/// This class is a small wrapper around the hdfsFile handle and the file system
+/// instance which is needed to close the file handle. The handle incorporates
+/// the last modified time of the file when it was opened. This is used to distinguish
+/// between file handles for files that can be updated or overwritten.
+class HdfsFileHandle {
+ public:
+
+  /// Constructor will open the file
+  HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime);
+
+  /// Destructor will close the file handle
+  ~HdfsFileHandle();
+
+  hdfsFile file() const { return hdfs_file_;  }
+  int64_t mtime() const { return mtime_; }
+  bool ok() const { return hdfs_file_ != nullptr; }
+
+ private:
+  hdfsFS fs_;
+  hdfsFile hdfs_file_;
+  int64_t mtime_;
+};
+
+/// The FileHandleCache is a data structure that owns HdfsFileHandles to share between
+/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions.
+/// Each partition operates independently with its own locks, reducing contention
+/// between concurrent threads. The `capacity` is split between the partitions and is
+/// enforced independently.
+///
+/// Threads check out a file handle for exclusive access and return it when finished.
+/// If the file handle is not already present in the cache or all file handles for this
+/// file are checked out, the file handle is constructed and added to the cache.
+/// The cache can contain multiple file handles for the same file. If a file handle
+/// is checked out, it cannot be evicted from the cache. In this case, a cache can
+/// exceed the specified capacity.
+///
+/// The file handle cache is currently not suitable for remote files that maintain a
+/// connection as part of the handle. Most remote systems have a limit on the number
+/// of concurrent connections, and file handles in the cache would be counted towards
+/// that limit.
+///
+/// TODO: If there is a file handle in the cache and the underlying file is deleted,
+/// the file handle might keep the file from being deleted at the OS level. This can
+/// take up disk space and impact correctness. The cache should check periodically to
+/// evict file handles older than some configurable threshold. The cache should also
+/// evict file handles more aggressively if the file handle's mtime is older than the
+/// file's current mtime.
+template <size_t NUM_PARTITIONS>
+class FileHandleCache {
+ public:
+  /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS
+  /// partitions. If the capacity does not split evenly, then the capacity is rounded
+  /// up.
+  FileHandleCache(size_t capacity);
+
+  /// Get a file handle from the cache for the specified filename (fname) and
+  /// last modification time (mtime). This will hash the filename to determine
+  /// which partition to use for this file handle.
+  ///
+  /// If 'require_new_handle' is false and the partition contains an available handle,
+  /// the handle is returned and cache_hit is set to true. Otherwise, the partition will
+  /// try to construct a file handle and add it to the partition. On success, the new
+  /// file handle will be returned with cache_hit set to false. On failure, nullptr will
+  /// be returned. In either case, the partition may evict a file handle to make room
+  /// for the new file handle.
+  ///
+  /// This obtains exclusive control over the returned file handle. It must be paired
+  /// with a call to ReleaseFileHandle to release exclusive control.
+  HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
+      bool require_new_handle, bool* cache_hit);
+
+  /// Release the exclusive hold on the specified file handle (which was obtained
+  /// by calling GetFileHandle). The cache may evict a file handle if the cache is
+  /// above capacity. If 'destroy_handle' is true, immediately remove this handle
+  /// from the cache.
+  void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle);
+
+ private:
+  struct FileHandleEntry;
+  typedef std::multimap<std::string, FileHandleEntry> MapType;
+  typedef std::list<typename MapType::iterator> LruListType;
+
+  struct FileHandleEntry {
+    FileHandleEntry(HdfsFileHandle *fh_in) : fh(fh_in) {}
+    std::unique_ptr<HdfsFileHandle> fh;
+
+    /// in_use is true for a file handle checked out via GetFileHandle() that has not
+    /// been returned via ReleaseFileHandle().
+    bool in_use = false;
+
+    /// Iterator to this element's location in the LRU list. This only has a valid value
+    /// if in_use is false.
+    typename LruListType::iterator lru_entry;
+  };
+
+  /// Each partition operates independently, and thus has its own cache, LRU list,
+  /// and corresponding lock. To avoid contention on the lock_ due to false sharing
+  /// the partitions are aligned to cache line boundaries.
+  struct FileHandleCachePartition : public CacheLineAligned {
+    /// Protects access to cache and lru_list.
+    SpinLock lock;
+
+    /// Multimap from the file name to the file handles for that file. The cache
+    /// can contain multiple file handles for the same file and some may have
+    /// different mtimes if the file is being modified. All file handles are always
+    /// owned by the cache.
+    MapType cache;
+
+    /// The LRU list only contains file handles that are not in use.
+    LruListType lru_list;
+
+    /// Maximum number of file handles in cache without evicting unused file handles.
+    /// It is not a strict limit, and can be exceeded if all file handles are in use.
+    size_t capacity;
+
+    /// Current number of file handles in the cache
+    size_t size;
+  };
+
+  /// If the partition is above its capacity, evict the oldest unused file handles to
+  /// enforce the capacity.
+  void EvictHandles(FileHandleCachePartition& p);
+
+  std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
new file mode 100644
index 0000000..8d3a9e0
--- /dev/null
+++ b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/disk-io-mgr-handle-cache.h"
+#include "util/hash-util.h"
+
+#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
+#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
+
+namespace impala {
+
+HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname,
+    int64_t mtime)
+    : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) {
+  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
+  VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
+}
+
+HdfsFileHandle::~HdfsFileHandle() {
+  if (hdfs_file_ != nullptr && fs_ != nullptr) {
+    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
+    VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
+    hdfsCloseFile(fs_, hdfs_file_);
+  }
+  fs_ = nullptr;
+  hdfs_file_ = nullptr;
+}
+
+template <size_t NUM_PARTITIONS>
+FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity) {
+  DCHECK_GT(NUM_PARTITIONS, 0);
+  size_t remainder = capacity % NUM_PARTITIONS;
+  size_t base_capacity = capacity / NUM_PARTITIONS;
+  size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
+  for (FileHandleCachePartition& p : cache_partitions_) {
+    p.size = 0;
+    p.capacity = partition_capacity;
+  }
+}
+
+template <size_t NUM_PARTITIONS>
+HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle(
+    const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle,
+    bool* cache_hit) {
+  // Hash the key and get appropriate partition
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  FileHandleCachePartition& p = cache_partitions_[index];
+  boost::lock_guard<SpinLock> g(p.lock);
+  pair<typename MapType::iterator, typename MapType::iterator> range =
+    p.cache.equal_range(*fname);
+
+  // If this requires a new handle, skip to the creation codepath. Otherwise,
+  // find an unused entry with the same mtime
+  FileHandleEntry* ret_elem = nullptr;
+  if (!require_new_handle) {
+    while (range.first != range.second) {
+      FileHandleEntry* elem = &range.first->second;
+      if (!elem->in_use && elem->fh->mtime() == mtime) {
+        // remove from lru
+        p.lru_list.erase(elem->lru_entry);
+        ret_elem = elem;
+        *cache_hit = true;
+        break;
+      }
+      ++range.first;
+    }
+  }
+
+  // There was no entry that was free or caller asked for a new handle
+  if (!ret_elem) {
+    *cache_hit = false;
+    // Create a new entry and put it in the map
+    HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime);
+    if (!new_fh->ok()) {
+      delete new_fh;
+      return nullptr;
+    }
+    typename MapType::iterator new_it = p.cache.emplace_hint(range.second, *fname,
+        new_fh);
+    ret_elem = &new_it->second;
+    ++p.size;
+    if (p.size > p.capacity) EvictHandles(p);
+  }
+
+  DCHECK(ret_elem->fh.get() != nullptr);
+  DCHECK(!ret_elem->in_use);
+  ret_elem->in_use = true;
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
+  return ret_elem->fh.get();
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname,
+    HdfsFileHandle* fh, bool destroy_handle) {
+  // Hash the key and get appropriate partition
+  int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS;
+  FileHandleCachePartition& p = cache_partitions_[index];
+  boost::lock_guard<SpinLock> g(p.lock);
+  pair<typename MapType::iterator, typename MapType::iterator> range =
+    p.cache.equal_range(*fname);
+
+  // TODO: This can be optimized by maintaining some state in the file handle about
+  // its location in the map.
+  typename MapType::iterator release_it = range.first;
+  while (release_it != range.second) {
+    FileHandleEntry* elem = &release_it->second;
+    if (elem->fh.get() == fh) break;
+    ++release_it;
+  }
+  DCHECK(release_it != range.second);
+
+  // This file handle is no longer referenced
+  FileHandleEntry* release_elem = &release_it->second;
+  DCHECK(release_elem->in_use);
+  release_elem->in_use = false;
+  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
+  if (destroy_handle) {
+    --p.size;
+    p.cache.erase(release_it);
+    return;
+  }
+  // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces
+  // this buffering so that the file handle takes up less memory when in the cache.
+  // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero
+  // return code, and we close the file handle and remove it from the cache.
+  if (hdfsUnbufferFile(release_elem->fh->file()) == 0) {
+    release_elem->lru_entry = p.lru_list.insert(p.lru_list.end(), release_it);
+    if (p.size > p.capacity) EvictHandles(p);
+  } else {
+    VLOG_FILE << "FS does not support file handle unbuffering, closing file="
+              << fname;
+    --p.size;
+    p.cache.erase(release_it);
+  }
+}
+
+template <size_t NUM_PARTITIONS>
+void FileHandleCache<NUM_PARTITIONS>::EvictHandles(
+    FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) {
+  while (p.size > p.capacity) {
+    if (p.lru_list.size() == 0) break;
+    typename MapType::iterator evict_it = p.lru_list.front();
+    DCHECK(!evict_it->second.in_use);
+    p.cache.erase(evict_it);
+    p.lru_list.pop_front();
+    --p.size;
+  }
+}
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index 3c5c5ba..30bcd60 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -248,6 +248,12 @@ class DiskIoRequestContext {
   /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
   AtomicInt32 num_unstarted_scan_ranges_;
 
+  /// Total number of file handle opens where the file handle was present in the cache
+  AtomicInt32 cached_file_handles_hit_count_;
+
+  /// Total number of file handle opens where the file handle was not in the cache
+  AtomicInt32 cached_file_handles_miss_count_;
+
   /// The number of buffers that are being used for this reader. This is the sum
   /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
   /// to be queued). This includes both IOMgr-allocated buffers and client-provided

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index 0876c6d..6f3fe77 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -164,6 +164,8 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) {
   bytes_read_short_circuit_.Store(0);
   bytes_read_dn_cache_.Store(0);
   unexpected_remote_bytes_.Store(0);
+  cached_file_handles_hit_count_.Store(0);
+  cached_file_handles_miss_count_.Store(0);
   initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
 
   DCHECK(ready_to_start_ranges_.empty());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 9ccd1da..3fcd59d 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -48,7 +48,7 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) {
     DCHECK(!eosr_queued_);
     if (is_cancelled_) {
       // Return the buffer, this range has been cancelled
-      if (buffer->buffer_ != NULL) {
+      if (buffer->buffer_ != nullptr) {
         io_mgr_->num_buffers_in_readers_.Add(1);
         reader_->num_buffers_in_reader_.Add(1);
       }
@@ -75,7 +75,7 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) {
 }
 
 Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
-  *buffer = NULL;
+  *buffer = nullptr;
 
   {
     unique_lock<mutex> scan_range_lock(lock_);
@@ -115,7 +115,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
   Status status = (*buffer)->status_;
   if (!status.ok()) {
     (*buffer)->Return();
-    *buffer = NULL;
+    *buffer = nullptr;
     return status;
   }
 
@@ -132,7 +132,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
     reader_->blocked_ranges_.Remove(this);
     Cancel(reader_->status_);
     (*buffer)->Return();
-    *buffer = NULL;
+    *buffer = nullptr;
     return status_;
   }
 
@@ -149,7 +149,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
 
 void DiskIoMgr::ScanRange::Cancel(const Status& status) {
   // Cancelling a range that was never started, ignore.
-  if (io_mgr_ == NULL) return;
+  if (io_mgr_ == nullptr) return;
 
   DCHECK(!status.ok());
   {
@@ -189,7 +189,7 @@ string DiskIoMgr::ScanRange::DebugString() const {
      << " len=" << len_ << " bytes_read=" << bytes_read_
      << " buffer_queue=" << ready_buffers_.size()
      << " capacity=" << ready_buffers_capacity_
-     << " hdfs_file=" << hdfs_file_;
+     << " hdfs_file=" << exclusive_hdfs_fh_;
   return ss.str();
 }
 
@@ -210,18 +210,15 @@ bool DiskIoMgr::ScanRange::Validate() {
 
 DiskIoMgr::ScanRange::ScanRange(int capacity)
   : RequestRange(RequestType::READ),
-    meta_data_(NULL),
     try_cache_(false),
     expected_local_(false),
-    io_mgr_(NULL),
-    reader_(NULL),
-    hdfs_file_(NULL),
+    num_remote_bytes_(0),
     external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
     ready_buffers_capacity_(capacity),
     mtime_(-1) {}
 
 DiskIoMgr::ScanRange::~ScanRange() {
-  DCHECK(hdfs_file_ == NULL) << "File was not closed.";
+  DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
   DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
       << "Cached buffer was not released.";
 }
@@ -229,10 +226,11 @@ DiskIoMgr::ScanRange::~ScanRange() {
 void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
     int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
   DCHECK(ready_buffers_.empty());
-  DCHECK(file != NULL);
+  DCHECK(file != nullptr);
   DCHECK_GE(len, 0);
   DCHECK_GE(offset, 0);
-  DCHECK(buffer_opts.client_buffer_ == NULL || buffer_opts.client_buffer_len_ >= len_);
+  DCHECK(buffer_opts.client_buffer_ == nullptr ||
+         buffer_opts.client_buffer_len_ >= len_);
   fs_ = fs;
   file_ = file;
   len_ = len;
@@ -241,29 +239,30 @@ void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64
   try_cache_ = buffer_opts.try_cache_;
   mtime_ = buffer_opts.mtime_;
   expected_local_ = expected_local;
+  num_remote_bytes_ = 0;
   meta_data_ = meta_data;
-  if (buffer_opts.client_buffer_ != NULL) {
+  if (buffer_opts.client_buffer_ != nullptr) {
     external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
     client_buffer_.data = buffer_opts.client_buffer_;
     client_buffer_.len = buffer_opts.client_buffer_len_;
   } else {
     external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
   }
-  io_mgr_ = NULL;
-  reader_ = NULL;
-  hdfs_file_ = NULL;
+  io_mgr_ = nullptr;
+  reader_ = nullptr;
+  exclusive_hdfs_fh_ = nullptr;
 }
 
 void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
-  DCHECK(hdfs_file_ == NULL);
-  DCHECK(local_file_ == NULL);
+  DCHECK(exclusive_hdfs_fh_ == nullptr);
+  DCHECK(local_file_ == nullptr);
   // Reader must provide MemTracker or a buffer.
   DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
-      || reader->mem_tracker_ != NULL);
+      || reader->mem_tracker_ != nullptr);
   io_mgr_ = io_mgr;
   reader_ = reader;
-  local_file_ = NULL;
-  hdfs_file_ = NULL;
+  local_file_ = nullptr;
+  exclusive_hdfs_fh_ = nullptr;
   bytes_read_ = 0;
   is_cancelled_ = false;
   eosr_queued_= false;
@@ -276,30 +275,46 @@ void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext*
   DCHECK(Validate()) << DebugString();
 }
 
-Status DiskIoMgr::ScanRange::Open() {
+Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
   if (is_cancelled_) return Status::CANCELLED;
 
-  if (fs_ != NULL) {
-    if (hdfs_file_ != NULL) return Status::OK();
-    hdfs_file_ = io_mgr_->OpenHdfsFile(fs_, file(), mtime());
-    if (hdfs_file_ == NULL) {
+  if (fs_ != nullptr) {
+    if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
+    // With file handle caching, the scan range does not maintain its own
+    // hdfs file handle. File handle caching is only used for local files,
+    // so s3 and remote filesystems should obtain an exclusive file handle
+    // for each scan range.
+    if (use_file_handle_cache && expected_local_) return Status::OK();
+    exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
+        mtime(), reader_);
+    if (exclusive_hdfs_fh_ == nullptr) {
       return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
     }
 
-    if (hdfsSeek(fs_, hdfs_file_->file(), offset_) != 0) {
-      io_mgr_->CacheOrCloseFileHandle(file(), hdfs_file_, false);
-      hdfs_file_ = NULL;
-      string error_msg = GetHdfsErrorMsg("");
-      stringstream ss;
-      ss << "Error seeking to " << offset_ << " in file: " << file_ << " " << error_msg;
-      return Status(ss.str());
+    int num_retries = 0;
+    while (true) {
+      if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) == 0) break;
+      // Seek failed. If already retried once, return error. Otherwise, destroy
+      // the current file handle and retry with a new handle.
+      DCHECK_LE(num_retries, 1);
+      if (num_retries == 1) {
+        io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_);
+        exclusive_hdfs_fh_ = nullptr;
+        string error_msg = GetHdfsErrorMsg("");
+        stringstream ss;
+        ss << "Error seeking to " << offset_ << " in file: " << file_ << " " << error_msg;
+        return Status(ss.str());
+      }
+      ++num_retries;
+      RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
+          mtime(), &exclusive_hdfs_fh_));
     }
   } else {
-    if (local_file_ != NULL) return Status::OK();
+    if (local_file_ != nullptr) return Status::OK();
 
     local_file_ = fopen(file(), "r");
-    if (local_file_ == NULL) {
+    if (local_file_ == nullptr) {
       string error_msg = GetStrErrMsg();
       stringstream ss;
       ss << "Could not open file: " << file_ << ": " << error_msg;
@@ -307,7 +322,7 @@ Status DiskIoMgr::ScanRange::Open() {
     }
     if (fseek(local_file_, offset_, SEEK_SET) == -1) {
       fclose(local_file_);
-      local_file_ = NULL;
+      local_file_ = nullptr;
       string error_msg = GetStrErrMsg();
       stringstream ss;
       ss << "Could not seek to " << offset_ << " for file: " << file_
@@ -315,7 +330,7 @@ Status DiskIoMgr::ScanRange::Open() {
       return Status(ss.str());
     }
   }
-  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != NULL) {
+  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
     ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
   }
   return Status::OK();
@@ -323,57 +338,52 @@ Status DiskIoMgr::ScanRange::Open() {
 
 void DiskIoMgr::ScanRange::Close() {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (fs_ != NULL) {
-    if (hdfs_file_ == NULL) return;
+  bool closed_file = false;
+  if (fs_ != nullptr) {
+    if (exclusive_hdfs_fh_ != nullptr) {
+      GetHdfsStatistics(exclusive_hdfs_fh_->file());
+
+      if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
+        hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
+        cached_buffer_ = nullptr;
+        external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
+      }
+
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_);
+      exclusive_hdfs_fh_ = nullptr;
+      closed_file = true;
+    }
 
-    struct hdfsReadStatistics* stats;
-    if (IsHdfsPath(file())) {
-      int success = hdfsFileGetReadStatistics(hdfs_file_->file(), &stats);
+    if (FLAGS_use_hdfs_pread) {
+      // Update Hedged Read Metrics.
+      // We call it only if the --use_hdfs_pread flag is set, to avoid having the
+      // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
+      // otherwise.
+      struct hdfsHedgedReadMetrics* hedged_metrics;
+      int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
       if (success == 0) {
-        reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
-        reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
-        reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
-        if (stats->totalLocalBytesRead != stats->totalBytesRead) {
-          reader_->num_remote_ranges_.Add(1);
-          if (expected_local_) {
-            int remote_bytes = stats->totalBytesRead - stats->totalLocalBytesRead;
-            reader_->unexpected_remote_bytes_.Add(remote_bytes);
-            VLOG_FILE << "Unexpected remote HDFS read of "
-                      << PrettyPrinter::Print(remote_bytes, TUnit::BYTES)
-                      << " for file '" << file_ << "'";
-          }
-        }
-        hdfsFileFreeReadStatistics(stats);
+        ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
+        ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
       }
+      hdfsFreeHedgedReadMetrics(hedged_metrics);
+    }
 
-      if (FLAGS_use_hdfs_pread) {
-        // Update Hedged Read Metrics.
-        // We call it only if the --use_hdfs_pread flag is set, to avoid having the
-        // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
-        // otherwise.
-        struct hdfsHedgedReadMetrics* hedged_metrics;
-        success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
-        if (success == 0) {
-          ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
-          ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
-        }
-        hdfsFreeHedgedReadMetrics(hedged_metrics);
+    if (num_remote_bytes_ > 0) {
+      reader_->num_remote_ranges_.Add(1L);
+      if (expected_local_) {
+        reader_->unexpected_remote_bytes_.Add(num_remote_bytes_);
+        VLOG_FILE << "Unexpected remote HDFS read of "
+                  << PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES)
+                  << " for file '" << file_ << "'";
       }
     }
-    if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
-      hadoopRzBufferFree(hdfs_file_->file(), cached_buffer_);
-      cached_buffer_ = NULL;
-      external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
-    }
-    io_mgr_->CacheOrCloseFileHandle(file(), hdfs_file_, false);
-    VLOG_FILE << "Cache HDFS file handle file=" << file();
-    hdfs_file_ = NULL;
   } else {
-    if (local_file_ == NULL) return;
+    if (local_file_ == nullptr) return;
     fclose(local_file_);
-    local_file_ = NULL;
+    local_file_ = nullptr;
+    closed_file = true;
   }
-  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != NULL) {
+  if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
     ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
   }
 }
@@ -407,34 +417,101 @@ Status DiskIoMgr::ScanRange::Read(
   int bytes_to_read = min(len_ - bytes_read_, buffer_len);
   DCHECK_GE(bytes_to_read, 0);
 
-  if (fs_ != NULL) {
-    DCHECK(hdfs_file_ != NULL);
+  if (fs_ != nullptr) {
+    HdfsFileHandle* borrowed_hdfs_fh = nullptr;
+    hdfsFile hdfs_file;
+
+    // If the scan range has an exclusive file handle, use it. Otherwise, borrow
+    // a file handle from the cache.
+    if (exclusive_hdfs_fh_ != nullptr) {
+      hdfs_file = exclusive_hdfs_fh_->file();
+    } else {
+      borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
+          mtime(), reader_);
+      if (borrowed_hdfs_fh == nullptr) {
+        return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+      }
+      hdfs_file = borrowed_hdfs_fh->file();
+    }
+
     int64_t max_chunk_size = MaxReadChunkSize();
+    Status status = Status::OK();
     while (*bytes_read < bytes_to_read) {
       int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
       DCHECK_GE(chunk_size, 0);
       // The hdfsRead() length argument is an int.
       DCHECK_LE(chunk_size, numeric_limits<int>::max());
-      int last_read = -1;
-      if (FLAGS_use_hdfs_pread) {
-        // bytes_read_ is only updated after the while loop
-        int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
-        last_read = hdfsPread(fs_, hdfs_file_->file(), position_in_file,
-            buffer + *bytes_read, chunk_size);
-      } else {
-        last_read = hdfsRead(fs_, hdfs_file_->file(), buffer + *bytes_read, chunk_size);
+      int current_bytes_read = -1;
+      // bytes_read_ is only updated after the while loop
+      int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
+      int num_retries = 0;
+      while (true) {
+        status = Status::OK();
+        // For file handles from the cache, any of the below file operations may fail
+        // due to a bad file handle. In each case, record the error, but allow for a
+        // retry to fix it.
+        if (FLAGS_use_hdfs_pread) {
+          current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
+              buffer + *bytes_read, chunk_size);
+          if (current_bytes_read == -1) {
+            status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+          }
+        } else {
+          // If the file handle is borrowed, it may not be at the appropriate
+          // location. Seek to the appropriate location.
+          bool seek_failed = false;
+          if (borrowed_hdfs_fh != nullptr) {
+            if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
+              string error_msg = GetHdfsErrorMsg("");
+              stringstream ss;
+              ss << "Error seeking to " << position_in_file << " in file: "
+                 << file_ << " " << error_msg;
+              status = Status(ss.str());
+              seek_failed = true;
+            }
+          }
+          if (!seek_failed) {
+            current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
+                chunk_size);
+            if (current_bytes_read == -1) {
+              status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+            }
+          }
+        }
+
+        // Do not retry:
+        // - if read was successful (current_bytes_read != -1)
+        // - or if already retried once
+        DCHECK_LE(num_retries, 1);
+        if (current_bytes_read != -1 || num_retries == 1) {
+          break;
+        }
+        // The error may be due to a bad file handle. Reopen the file handle and retry.
+        ++num_retries;
+        HdfsFileHandle** fh_to_refresh =
+            (borrowed_hdfs_fh != nullptr ? &borrowed_hdfs_fh : &exclusive_hdfs_fh_);
+        RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
+            mtime(), fh_to_refresh));
+        hdfs_file = (*fh_to_refresh)->file();
       }
-      if (last_read == -1) {
-        return Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
-      } else if (last_read == 0) {
+      if (!status.ok()) break;
+      if (current_bytes_read == 0) {
         // No more bytes in the file. The scan range went past the end.
         *eosr = true;
         break;
       }
-      *bytes_read += last_read;
+      *bytes_read += current_bytes_read;
+
+      // Collect and accumulate statistics
+      GetHdfsStatistics(hdfs_file);
+    }
+
+    if (borrowed_hdfs_fh != nullptr) {
+      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh);
     }
+    if (!status.ok()) return status;
   } else {
-    DCHECK(local_file_ != NULL);
+    DCHECK(local_file_ != nullptr);
     *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
     DCHECK_GE(*bytes_read, 0);
     DCHECK_LE(*bytes_read, bytes_to_read);
@@ -462,21 +539,32 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   DCHECK(try_cache_);
   DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
-  Status status = Open();
+  Status status = Open(false);
   if (!status.ok()) return status;
 
   // Cached reads not supported on local filesystem.
-  if (fs_ == NULL) return Status::OK();
+  if (fs_ == nullptr) return Status::OK();
 
   {
     unique_lock<mutex> hdfs_lock(hdfs_lock_);
     if (is_cancelled_) return Status::CANCELLED;
 
-    DCHECK(hdfs_file_ != NULL);
+    DCHECK(exclusive_hdfs_fh_ != nullptr);
     DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
-    cached_buffer_ =
-        hadoopReadZero(hdfs_file_->file(), io_mgr_->cached_read_options_, len());
-    if (cached_buffer_ != NULL) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+    int num_retries = 0;
+    while (true) {
+      cached_buffer_ = hadoopReadZero(exclusive_hdfs_fh_->file(),
+          io_mgr_->cached_read_options_, len());
+      if (cached_buffer_ != nullptr) {
+        external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
+        break;
+      }
+      DCHECK_LE(num_retries, 1);
+      if (num_retries == 1) break;
+      ++num_retries;
+      RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(), mtime(),
+          &exclusive_hdfs_fh_));
+    }
   }
   // Data was not cached, caller will fall back to normal read path.
   if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
@@ -504,19 +592,36 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   }
 
   // Create a single buffer desc for the entire scan range and enqueue that.
-  // 'mem_tracker' is NULL because the memory is owned by the HDFS java client,
+  // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
   // not the Impala backend.
-  BufferDescriptor* desc =
-      io_mgr_->GetBufferDesc(reader_, NULL, this, reinterpret_cast<uint8_t*>(buffer), 0);
+  BufferDescriptor* desc = io_mgr_->GetBufferDesc(reader_, nullptr, this,
+      reinterpret_cast<uint8_t*>(buffer), 0);
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
   EnqueueBuffer(desc);
-  if (reader_->bytes_read_counter_ != NULL) {
+  if (reader_->bytes_read_counter_ != nullptr) {
     COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
   }
   *read_succeeded = true;
   reader_->num_used_buffers_.Add(1);
   return Status::OK();
 }
+
+void DiskIoMgr::ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) {
+  struct hdfsReadStatistics* stats;
+  if (IsHdfsPath(file())) {
+    int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
+    if (success == 0) {
+      reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
+      reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
+      reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
+      if (stats->totalLocalBytesRead != stats->totalBytesRead) {
+        num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
+      }
+      hdfsFileFreeReadStatistics(stats);
+    }
+    hdfsFileClearReadStatistics(hdfs_file);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/effe973a/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 1f28b1a..a6d3c19 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "runtime/disk-io-mgr.h"
+#include "runtime/disk-io-mgr-handle-cache.inline.h"
 #include "runtime/disk-io-mgr-internal.h"
 
 #include <boost/algorithm/string.hpp>
@@ -63,11 +64,11 @@ DEFINE_int32(max_free_io_buffers, 128,
     "For each io buffer size, the maximum number of buffers the IoMgr will hold onto");
 
 // The number of cached file handles defines how much memory can be used per backend for
-// caching frequently used file handles. Currently, we assume that approximately 2kB data
-// are associated with a single file handle. 10k file handles will thus reserve ~20MB
-// data. The actual amount of memory that is associated with a file handle can be larger
+// caching frequently used file handles. Measurements indicate that a single file handle
+// uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory.
+// The actual amount of memory that is associated with a file handle can be larger
 // or smaller, depending on the replication factor for this file or the path name.
-DEFINE_uint64(max_cached_file_handles, 0, "Maximum number of HDFS file handles "
+DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles "
     "that will be cached. Disabled if set to 0.");
 
 // Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
@@ -91,28 +92,6 @@ static inline bool is_file_handle_caching_enabled() {
 }
 }
 
-/// This method is used to clean up resources upon eviction of a cache file handle.
-void DiskIoMgr::HdfsCachedFileHandle::Release(DiskIoMgr::HdfsCachedFileHandle** h) {
-  ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
-  VLOG_FILE << "Cached file handle evicted, hdfsCloseFile() fid=" << (*h)->hdfs_file_;
-  delete (*h);
-}
-
-DiskIoMgr::HdfsCachedFileHandle::HdfsCachedFileHandle(const hdfsFS& fs, const char* fname,
-    int64_t mtime)
-    : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) {
-  VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
-}
-
-DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() {
-  if (hdfs_file_ != NULL && fs_ != NULL) {
-    VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
-    hdfsCloseFile(fs_, hdfs_file_);
-  }
-  fs_ = NULL;
-  hdfs_file_ = NULL;
-}
-
 // This class provides a cache of DiskIoRequestContext objects.  DiskIoRequestContexts
 // are recycled. This is good for locality as well as lock contention.  The cache has
 // the property that regardless of how many clients get added/removed, the memory
@@ -217,11 +196,11 @@ DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : io_mgr_(io_mg
 }
 
 void DiskIoMgr::BufferDescriptor::Reset() {
-  DCHECK(io_mgr_ != NULL);
-  reader_ = NULL;
-  scan_range_ = NULL;
-  mem_tracker_ = NULL;
-  buffer_ = NULL;
+  DCHECK(io_mgr_ != nullptr);
+  reader_ = nullptr;
+  scan_range_ = nullptr;
+  mem_tracker_ = nullptr;
+  buffer_ = nullptr;
   buffer_len_ = 0;
   len_ = 0;
   eosr_ = false;
@@ -231,13 +210,13 @@ void DiskIoMgr::BufferDescriptor::Reset() {
 
 void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange* range,
     uint8_t* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
-  DCHECK(io_mgr_ != NULL);
-  DCHECK(buffer_ == NULL);
-  DCHECK(range != NULL);
-  DCHECK(buffer != NULL);
+  DCHECK(io_mgr_ != nullptr);
+  DCHECK(buffer_ == nullptr);
+  DCHECK(range != nullptr);
+  DCHECK(buffer != nullptr);
   DCHECK_GE(buffer_len, 0);
   DCHECK_NE(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER,
-      mem_tracker == NULL);
+      mem_tracker == nullptr);
   reader_ = reader;
   scan_range_ = range;
   mem_tracker_ = mem_tracker;
@@ -250,18 +229,18 @@ void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange*
 }
 
 void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
-  DCHECK(dst != NULL);
+  DCHECK(dst != nullptr);
   DCHECK(!is_client_buffer());
   // Memory of cached buffers is not tracked against a tracker.
   if (is_cached()) return;
-  DCHECK(mem_tracker_ != NULL);
+  DCHECK(mem_tracker_ != nullptr);
   dst->Consume(buffer_len_);
   mem_tracker_->Release(buffer_len_);
   mem_tracker_ = dst;
 }
 
 void DiskIoMgr::BufferDescriptor::Return() {
-  DCHECK(io_mgr_ != NULL);
+  DCHECK(io_mgr_ != nullptr);
   io_mgr_->ReturnBuffer(this);
 }
 
@@ -296,13 +275,11 @@ DiskIoMgr::DiskIoMgr() :
     num_threads_per_disk_(FLAGS_num_threads_per_disk),
     max_buffer_size_(FLAGS_read_size),
     min_buffer_size_(FLAGS_min_buffer_size),
-    cached_read_options_(NULL),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-        FileSystemUtil::MaxNumFileHandles()),
-        &HdfsCachedFileHandle::Release) {
+        FileSystemUtil::MaxNumFileHandles())) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = FLAGS_num_disks == 0 ? DiskInfo::num_disks() : FLAGS_num_disks;
@@ -315,12 +292,11 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_s
     num_threads_per_disk_(threads_per_disk),
     max_buffer_size_(max_buffer_size),
     min_buffer_size_(min_buffer_size),
-    cached_read_options_(NULL),
     shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
-            FileSystemUtil::MaxNumFileHandles()), &HdfsCachedFileHandle::Release) {
+        FileSystemUtil::MaxNumFileHandles())) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
@@ -332,7 +308,7 @@ DiskIoMgr::~DiskIoMgr() {
   shut_down_ = true;
   // Notify all worker threads and shut them down.
   for (int i = 0; i < disk_queues_.size(); ++i) {
-    if (disk_queues_[i] == NULL) continue;
+    if (disk_queues_[i] == nullptr) continue;
     {
       // This lock is necessary to properly use the condition var to notify
       // the disk worker threads.  The readers also grab this lock so updates
@@ -344,7 +320,7 @@ DiskIoMgr::~DiskIoMgr() {
   disk_thread_group_.JoinAll();
 
   for (int i = 0; i < disk_queues_.size(); ++i) {
-    if (disk_queues_[i] == NULL) continue;
+    if (disk_queues_[i] == nullptr) continue;
     int disk_id = disk_queues_[i]->disk_id;
     for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
         it != disk_queues_[i]->request_contexts.end(); ++it) {
@@ -354,7 +330,7 @@ DiskIoMgr::~DiskIoMgr() {
     }
   }
 
-  DCHECK(request_context_cache_.get() == NULL ||
+  DCHECK(request_context_cache_.get() == nullptr ||
       request_context_cache_->ValidateAllInactive())
       << endl << DebugString();
   DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
@@ -371,13 +347,13 @@ DiskIoMgr::~DiskIoMgr() {
     delete disk_queues_[i];
   }
 
-  if (free_buffer_mem_tracker_ != NULL) free_buffer_mem_tracker_->UnregisterFromParent();
+  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->UnregisterFromParent();
 
-  if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
+  if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_);
 }
 
 Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
-  DCHECK(process_mem_tracker != NULL);
+  DCHECK(process_mem_tracker != nullptr);
   free_buffer_mem_tracker_.reset(
       new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
 
@@ -405,12 +381,12 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   request_context_cache_.reset(new RequestContextCache(this));
 
   cached_read_options_ = hadoopRzOptionsAlloc();
-  DCHECK(cached_read_options_ != NULL);
+  DCHECK(cached_read_options_ != nullptr);
   // Disable checksumming for cached reads.
   int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true);
   DCHECK_EQ(ret, 0);
   // Disable automatic fallback for cached reads.
-  ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, NULL);
+  ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, nullptr);
   DCHECK_EQ(ret, 0);
 
   return Status::OK();
@@ -418,7 +394,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
 
 void DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
     MemTracker* mem_tracker) {
-  DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
+  DCHECK(request_context_cache_.get() != nullptr) << "Must call Init() first.";
   *request_context = request_context_cache_->GetNewContext();
   (*request_context)->Reset(mem_tracker);
 }
@@ -515,6 +491,14 @@ int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const {
   return reader->unexpected_remote_bytes_.Load();
 }
 
+int DiskIoMgr::cached_file_handles_hit_count(DiskIoRequestContext* reader) const {
+  return reader->cached_file_handles_hit_count_.Load();
+}
+
+int DiskIoMgr::cached_file_handles_miss_count(DiskIoRequestContext* reader) const {
+  return reader->cached_file_handles_miss_count_.Load();
+}
+
 int64_t DiskIoMgr::GetReadThroughput() {
   return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_);
 }
@@ -585,9 +569,9 @@ Status DiskIoMgr::AddScanRange(
 // for eos and error cases. If there isn't already a cached scan range or a scan
 // range prepared by the disk threads, the caller waits on the disk threads.
 Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) {
-  DCHECK(reader != NULL);
-  DCHECK(range != NULL);
-  *range = NULL;
+  DCHECK(reader != nullptr);
+  DCHECK(range != nullptr);
+  *range = nullptr;
   Status status = Status::OK();
 
   unique_lock<mutex> reader_lock(reader->lock_);
@@ -617,7 +601,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
       // This range ended up not being cached. Loop again and pick up a new range.
       reader->AddRequestRange(*range, false);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
-      *range = NULL;
+      *range = nullptr;
       continue;
     }
 
@@ -625,12 +609,12 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
       reader->ready_to_start_ranges_cv_.wait(reader_lock);
     } else {
       *range = reader->ready_to_start_ranges_.Dequeue();
-      DCHECK(*range != NULL);
+      DCHECK(*range != nullptr);
       int disk_id = (*range)->disk_id();
       DCHECK_EQ(*range, reader->disk_states_[disk_id].next_scan_range_to_start());
-      // Set this to NULL, the next time this disk runs for this reader, it will
+      // Set this to nullptr, the next time this disk runs for this reader, it will
       // get another range ready.
-      reader->disk_states_[disk_id].set_next_scan_range_to_start(NULL);
+      reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
       reader->ScheduleScanRange(*range);
       break;
     }
@@ -640,9 +624,9 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
 
 Status DiskIoMgr::Read(DiskIoRequestContext* reader,
     ScanRange* range, BufferDescriptor** buffer) {
-  DCHECK(range != NULL);
-  DCHECK(buffer != NULL);
-  *buffer = NULL;
+  DCHECK(range != nullptr);
+  DCHECK(buffer != nullptr);
+  *buffer = nullptr;
 
   if (range->len() > max_buffer_size_
       && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
@@ -655,17 +639,17 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader,
   ranges.push_back(range);
   RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
   RETURN_IF_ERROR(range->GetNext(buffer));
-  DCHECK((*buffer) != NULL);
+  DCHECK((*buffer) != nullptr);
   DCHECK((*buffer)->eosr());
   return Status::OK();
 }
 
 void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
-  DCHECK(buffer_desc != NULL);
-  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
+  DCHECK(buffer_desc != nullptr);
+  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
 
   DiskIoRequestContext* reader = buffer_desc->reader_;
-  if (buffer_desc->buffer_ != NULL) {
+  if (buffer_desc->buffer_ != nullptr) {
     if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
       // Buffers the were not allocated by DiskIoMgr don't need to be freed.
       FreeBufferMemory(buffer_desc);
@@ -673,7 +657,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
     num_buffers_in_readers_.Add(-1);
     reader->num_buffers_in_reader_.Add(-1);
   } else {
-    // A NULL buffer means there was an error in which case there is no buffer
+    // A nullptr buffer means there was an error in which case there is no buffer
     // to return.
   }
 
@@ -687,7 +671,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
 }
 
 void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
-  DCHECK(desc != NULL);
+  DCHECK(desc != nullptr);
   desc->Reset();
   unique_lock<mutex> lock(free_buffers_lock_);
   DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
@@ -723,24 +707,24 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
 
   // Track memory against the reader. This is checked the next time we start
   // a read for the next reader in DiskIoMgr::GetNextScanRange().
-  DCHECK(reader->mem_tracker_ != NULL);
+  DCHECK(reader->mem_tracker_ != nullptr);
   reader->mem_tracker_->Consume(buffer_size);
 
-  uint8_t* buffer = NULL;
+  uint8_t* buffer = nullptr;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
     if (free_buffers_[idx].empty()) {
       num_allocated_buffers_.Add(1);
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
         ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
       }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
         ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
       }
       // We already tracked this memory against the reader's MemTracker.
       buffer = new uint8_t[buffer_size];
     } else {
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
         ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
       }
       buffer = free_buffers_[idx].front();
@@ -750,9 +734,9 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
   }
 
   // Validate more invariants.
-  DCHECK(range != NULL);
-  DCHECK(reader != NULL);
-  DCHECK(buffer != NULL);
+  DCHECK(range != nullptr);
+  DCHECK(reader != nullptr);
+  DCHECK(buffer != nullptr);
   return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size);
 }
 
@@ -778,13 +762,13 @@ void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
     if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
   }
 
-  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
     ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
   }
-  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
     ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
   }
-  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
     ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
   }
 }
@@ -804,7 +788,7 @@ void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
     if (!FLAGS_disable_mem_pools &&
         free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
       free_buffers_[idx].push_back(buffer);
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
+      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
         ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
       }
       // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race
@@ -816,10 +800,10 @@ void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
     } else {
       num_allocated_buffers_.Add(-1);
       delete[] buffer;
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
+      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
         ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
       }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
+      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
         ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
       }
     }
@@ -827,7 +811,7 @@ void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
 
   // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr.
   desc->mem_tracker_->Release(buffer_size);
-  desc->buffer_ = NULL;
+  desc->buffer_ = nullptr;
 }
 
 // This function gets the next RequestRange to work on for this disk. It checks for
@@ -842,12 +826,12 @@ void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
 bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     DiskIoRequestContext** request_context) {
   int disk_id = disk_queue->disk_id;
-  *range = NULL;
+  *range = nullptr;
 
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
-    *request_context = NULL;
-    DiskIoRequestContext::PerDiskState* request_disk_state = NULL;
+    *request_context = nullptr;
+    DiskIoRequestContext::PerDiskState* request_disk_state = nullptr;
     {
       unique_lock<mutex> disk_lock(disk_queue->lock);
 
@@ -865,7 +849,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       // TODO: revisit.
       *request_context = disk_queue->request_contexts.front();
       disk_queue->request_contexts.pop_front();
-      DCHECK(*request_context != NULL);
+      DCHECK(*request_context != nullptr);
       request_disk_state = &((*request_context)->disk_states_[disk_id]);
       request_disk_state->IncrementRequestThreadAndDequeue();
     }
@@ -884,7 +868,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     // TODO: IMPALA-3209: we should not force a reader over its memory limit by
     // pushing more buffers to it. Most readers can make progress and operate within
     // a fixed memory limit.
-    if ((*request_context)->mem_tracker_ != NULL
+    if ((*request_context)->mem_tracker_ != nullptr
         && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
       (*request_context)->Cancel(Status::MemLimitExceeded());
     }
@@ -902,7 +886,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active)
         << (*request_context)->DebugString();
 
-    if (request_disk_state->next_scan_range_to_start() == NULL &&
+    if (request_disk_state->next_scan_range_to_start() == nullptr &&
         !request_disk_state->unstarted_scan_ranges()->empty()) {
       // We don't have a range queued for this disk for what the caller should
       // read next. Populate that.  We want to have one range waiting to minimize
@@ -914,7 +898,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
 
       if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
         // All the ranges have been started, notify everyone blocked on GetNextRange.
-        // Only one of them will get work so make sure to return NULL to the other
+        // Only one of them will get work so make sure to return nullptr to the other
         // caller threads.
         (*request_context)->ready_to_start_ranges_cv_.notify_all();
       } else {
@@ -943,7 +927,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     }
     DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
     *range = request_disk_state->in_flight_ranges()->Dequeue();
-    DCHECK(*range != NULL);
+    DCHECK(*range != nullptr);
 
     // Now that we've picked a request range, put the context back on the queue so
     // another thread can pick up another request range for this context.
@@ -987,13 +971,13 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
   DCHECK_GT(state.num_threads_in_op(), 0);
-  DCHECK(buffer->buffer_ != NULL);
+  DCHECK(buffer->buffer_ != nullptr);
 
   if (reader->state_ == DiskIoRequestContext::Cancelled) {
     state.DecrementRequestThreadAndCheckDone(reader);
     DCHECK(reader->Validate()) << endl << reader->DebugString();
     if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
-    buffer->buffer_ = NULL;
+    buffer->buffer_ = nullptr;
     buffer->scan_range_->Cancel(reader->status_);
     // Enqueue the buffer to use the scan range's buffer cleanup path.
     buffer->scan_range_->EnqueueBuffer(buffer);
@@ -1001,7 +985,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   }
 
   DCHECK_EQ(reader->state_, DiskIoRequestContext::Active);
-  DCHECK(buffer->buffer_ != NULL);
+  DCHECK(buffer->buffer_ != nullptr);
 
   // Update the reader's scan ranges.  There are a three cases here:
   //  1. Read error
@@ -1010,7 +994,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   if (!buffer->status_.ok()) {
     // Error case
     if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
-    buffer->buffer_ = NULL;
+    buffer->buffer_ = nullptr;
     buffer->eosr_ = true;
     --state.num_remaining_ranges();
     buffer->scan_range_->Cancel(buffer->status_);
@@ -1051,8 +1035,8 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   //   3. Perform the read or write as specified.
   // Cancellation checking needs to happen in both steps 1 and 3.
   while (true) {
-    DiskIoRequestContext* worker_context = NULL;;
-    RequestRange* range = NULL;
+    DiskIoRequestContext* worker_context = nullptr;;
+    RequestRange* range = nullptr;
 
     if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
       DCHECK(shut_down_);
@@ -1076,21 +1060,21 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
-  BufferDescriptor* buffer_desc = NULL;
+  BufferDescriptor* buffer_desc = nullptr;
   if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
     buffer_desc = GetBufferDesc(
-        reader, NULL, range, range->client_buffer_.data, range->client_buffer_.len);
+        reader, nullptr, range, range->client_buffer_.data, range->client_buffer_.len);
   } else {
     // Need to allocate a buffer to read into.
     int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
     buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size);
-    if (buffer_desc == NULL) return;
+    if (buffer_desc == nullptr) return;
   }
   reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
-  buffer_desc->status_ = range->Open();
+  buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
   if (buffer_desc->status_.ok()) {
     // Update counters.
     if (reader->active_read_thread_counter_) {
@@ -1107,7 +1091,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
         &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
-    if (reader->bytes_read_counter_ != NULL) {
+    if (reader->bytes_read_counter_ != nullptr) {
       COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
     }
 
@@ -1124,7 +1108,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
 DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
     DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range,
     int64_t buffer_size) {
-  DCHECK(reader->mem_tracker_ != NULL);
+  DCHECK(reader->mem_tracker_ != nullptr);
   bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
   if (!enough_memory) {
     // Low memory, GC all the buffers and try again.
@@ -1142,7 +1126,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
       state.DecrementRequestThreadAndCheckDone(reader);
       range->Cancel(reader->status_);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
-      return NULL;
+      return nullptr;
     }
 
     if (!range->ready_buffers_.empty()) {
@@ -1151,7 +1135,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
       range->blocked_on_queue_ = true;
       reader->blocked_ranges_.Enqueue(range);
       state.DecrementRequestThread();
-      return NULL;
+      return nullptr;
     } else {
       // We need to get a buffer anyway since there are none queued. The query
       // is likely to fail due to mem limits but there's nothing we can do about that
@@ -1159,13 +1143,13 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
     }
   }
   BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
-  DCHECK(buffer_desc != NULL);
+  DCHECK(buffer_desc != nullptr);
   return buffer_desc;
 }
 
 void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
   Status ret_status = Status::OK();
-  FILE* file_handle = NULL;
+  FILE* file_handle = nullptr;
   // Raw open() syscall will create file if not present when passed these flags.
   int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
   if (fd < 0) {
@@ -1174,14 +1158,14 @@ void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_ra
                                      write_range->file_, errno, GetStrErrMsg())));
   } else {
     file_handle = fdopen(fd, "wb");
-    if (file_handle == NULL) {
+    if (file_handle == nullptr) {
       ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
           Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
                                        GetStrErrMsg())));
     }
   }
 
-  if (file_handle != NULL) {
+  if (file_handle != nullptr) {
     ret_status = WriteRangeHelper(file_handle, write_range);
 
     int success = fclose(file_handle);
@@ -1214,7 +1198,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
         Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
         write_range->len_, write_range->file_, errno, GetStrErrMsg())));
   }
-  if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != NULL) {
+  if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
     ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
   }
 
@@ -1260,59 +1244,35 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
   return disk_id % num_local_disks();
 }
 
-DiskIoMgr::HdfsCachedFileHandle* DiskIoMgr::OpenHdfsFile(const hdfsFS& fs,
-    const char* fname, int64_t mtime) {
-  HdfsCachedFileHandle* fh = NULL;
-
-  // Check if a cached file handle exists and validate the mtime, if the mtime of the
-  // cached handle is not matching the mtime of the requested file, reopen.
-  if (detail::is_file_handle_caching_enabled() && file_handle_cache_.Pop(fname, &fh)) {
-    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L);
-    if (fh->mtime() == mtime) {
-      ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
-      ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
-      ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
-      return fh;
-    }
-    VLOG_FILE << "mtime mismatch, closing cached file handle. Closing file=" << fname;
-    delete fh;
-  }
-
-  // Update cache hit ratio
-  ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L);
-  ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L);
-  fh = new HdfsCachedFileHandle(fs, fname, mtime);
-
-  // Check if the file handle was opened correctly
-  if (!fh->ok())  {
-    VLOG_FILE << "Opening the file " << fname << " failed.";
-    delete fh;
-    return NULL;
+HdfsFileHandle* DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs,
+    std::string* fname, int64_t mtime, DiskIoRequestContext *reader) {
+  bool cache_hit;
+  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, false,
+      &cache_hit);
+  if (!fh) return nullptr;
+  if (cache_hit) {
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L);
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L);
+    reader->cached_file_handles_hit_count_.Add(1L);
+  } else {
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L);
+    ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L);
+    reader->cached_file_handles_miss_count_.Add(1L);
   }
-
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L);
   return fh;
 }
 
-void DiskIoMgr::CacheOrCloseFileHandle(const char* fname,
-    DiskIoMgr::HdfsCachedFileHandle* fid, bool close) {
-  ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L);
-  // Try to unbuffer the handle, on filesystems that do not support this call a non-zero
-  // return code indicates that the operation was not successful and thus the file is
-  // closed.
-  if (detail::is_file_handle_caching_enabled() &&
-      !close && hdfsUnbufferFile(fid->file()) == 0) {
-    // Clear read statistics before returning
-    hdfsFileClearReadStatistics(fid->file());
-    file_handle_cache_.Put(fname, fid);
-    ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L);
-  } else {
-    if (close) {
-      VLOG_FILE << "Closing file=" << fname;
-    } else {
-      VLOG_FILE << "FS does not support file handle unbuffering, closing file="
-                << fname;
-    }
-    delete fid;
-  }
+void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, HdfsFileHandle* fid) {
+  file_handle_cache_.ReleaseFileHandle(fname, fid, false);
+}
+
+Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname,
+    int64_t mtime, HdfsFileHandle** fid) {
+  bool dummy;
+  file_handle_cache_.ReleaseFileHandle(fname, *fid, true);
+  HdfsFileHandle* fh = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
+      &dummy);
+  if (!fh) return Status(GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
+  *fid = fh;
+  return Status::OK();
 }