You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/10/09 21:06:40 UTC

[impala] 01/04: IMPALA-8884: track storage read statistics per queue

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ae7a9189081b6ade267198873bd65fcb4bd90001
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Sep 2 23:53:22 2019 -0700

    IMPALA-8884: track storage read statistics per queue
    
    Adds the following metrics for each queue:
    * impala-server.io-mgr.queue-$i.device-name
    * impala-server.io-mgr.queue-$i.read-latency
    * impala-server.io-mgr.queue-$i.read-size
    
    I also looked at adding metrics for open operations,
    but the plumbing got messy since the code paths
    where hdfsOpen() is invoked are more numerous and
    complex (e.g. HDFS caching does it outside of a
    disk thread).
    
    Perf:
    Histograms use atomic operations, so lock contention
    isn't an issue.
    
    Ran a TPC-H benchmark on scale factor 30 locally,
    saw no perf change.
    
    Change-Id: I8233ed02b418f22f1d0c031e378288357796f4b4
    Reviewed-on: http://gerrit.cloudera.org:8080/14242
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                 |   4 +
 be/src/runtime/io/cache-reader-test-stub.h |   2 +-
 be/src/runtime/io/disk-io-mgr-internal.h   |  20 ++++
 be/src/runtime/io/disk-io-mgr.cc           |  43 ++++++++-
 be/src/runtime/io/file-reader.h            |   5 +-
 be/src/runtime/io/hdfs-file-reader.cc      |  24 ++---
 be/src/runtime/io/hdfs-file-reader.h       |   7 +-
 be/src/runtime/io/local-file-reader.cc     |  17 ++--
 be/src/runtime/io/local-file-reader.h      |   7 +-
 be/src/runtime/io/request-ranges.h         |  10 +-
 be/src/runtime/io/scan-range.cc            |  17 ++--
 be/src/runtime/test-env.cc                 |   2 +
 be/src/service/impala-server.cc            |   4 -
 be/src/util/histogram-metric.h             |  15 ++-
 be/src/util/impalad-metrics.cc             | 150 +++++++++++++++--------------
 be/src/util/impalad-metrics.h              |   1 +
 common/thrift/metrics.json                 |  30 ++++++
 17 files changed, 240 insertions(+), 118 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 600ce73..c82ec72 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -58,6 +58,7 @@
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
 #include "util/hdfs-bulk-ops.h"
+#include "util/impalad-metrics.h"
 #include "util/mem-info.h"
 #include "util/memory-metrics.h"
 #include "util/metrics.h"
@@ -328,6 +329,9 @@ Status ExecEnv::Init() {
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
   RETURN_IF_ERROR(RegisterMemoryMetrics(
       metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
+  // Initialize impalad metrics
+  ImpaladMetrics::CreateMetrics(
+      exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));
 
   // Resolve hostname to IP address.
   RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_));
diff --git a/be/src/runtime/io/cache-reader-test-stub.h b/be/src/runtime/io/cache-reader-test-stub.h
index b53b30b..e9d5b11 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -41,7 +41,7 @@ public:
     return Status::OK();
   }
 
-  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+  virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
       int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
     DCHECK(false);
     return Status("Not implemented");
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 3aa43b4..db176d8 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -50,6 +50,8 @@ DECLARE_uint64(max_cached_file_handles);
  } while (false);
 
 namespace impala {
+class HistogramMetric;
+
 namespace io {
 
 // Indicates if file handle caching should be used
@@ -91,6 +93,18 @@ class DiskQueue {
   /// Append debug string to 'ss'. Acquires the DiskQueue lock.
   void DebugString(std::stringstream* ss);
 
+  void set_read_latency(HistogramMetric* read_latency) {
+    DCHECK(read_latency_ == nullptr);
+    read_latency_ = read_latency;
+  }
+  void set_read_size(HistogramMetric* read_size) {
+    DCHECK(read_size_ == nullptr);
+    read_size_ = read_size;
+  }
+
+  HistogramMetric* read_latency() const { return read_latency_; }
+  HistogramMetric* read_size() const { return read_size_; }
+
  private:
   /// Called from the disk thread to get the next range to process. Wait until a scan
   /// is available to process, a write range is available, or 'shut_down_' is set to
@@ -101,6 +115,12 @@ class DiskQueue {
   /// Disk id (0-based)
   const int disk_id_;
 
+  /// Metric that tracks read latency for this queue.
+  HistogramMetric* read_latency_ = nullptr;
+
+  /// Metric that tracks read size for this queue.
+  HistogramMetric* read_size_ = nullptr;
+
   /// Lock that protects below members.
   boost::mutex lock_;
 
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index ed6b595..5a8b396 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -29,11 +29,13 @@
 
 #include "gutil/strings/substitute.h"
 #include "util/bit-util.h"
+#include "util/collection-metrics.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
-#include "util/collection-metrics.h"
+#include "util/histogram-metric.h"
 #include "util/metrics.h"
+#include "util/test-info.h"
 #include "util/time.h"
 
 #ifndef NDEBUG
@@ -146,6 +148,13 @@ DEFINE_bool(cache_remote_file_handles, true, "Enable the file handle cache for "
 DEFINE_bool(cache_s3_file_handles, true, "Enable the file handle cache for "
     "S3 files.");
 
+static const char* DEVICE_NAME_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.device-name";
+static const char* READ_LATENCY_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.read-latency";
+static const char* READ_SIZE_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.read-size";
+
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
 string DiskIoMgr::DebugString() {
@@ -271,6 +280,36 @@ Status DiskIoMgr::Init() {
       // During tests, i may not point to an existing disk.
       device_name = i < DiskInfo::num_disks() ? DiskInfo::device_name(i) : to_string(i);
     }
+    const string& i_string = Substitute("$0", i);
+
+    // Unit tests may create multiple DiskIoMgrs, so we need to avoid re-registering the
+    // same metrics.
+    if (!TestInfo::is_test()
+        || ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<StringProperty>(
+               Substitute(DEVICE_NAME_METRIC_KEY_TEMPLATE, i_string))
+            == nullptr) {
+      ImpaladMetrics::IO_MGR_METRICS->AddProperty<string>(
+          DEVICE_NAME_METRIC_KEY_TEMPLATE, device_name, i_string);
+    }
+    int64_t ONE_HOUR_IN_NS = 60L * 60L * NANOS_PER_SEC;
+    HistogramMetric* read_latency = nullptr;
+    HistogramMetric* read_size = nullptr;
+    if (TestInfo::is_test()) {
+      read_latency =
+        ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+            Substitute(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string));
+      read_size =
+        ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+            Substitute(READ_SIZE_METRIC_KEY_TEMPLATE, i_string));
+    }
+    disk_queues_[i]->set_read_latency(read_latency != nullptr ? read_latency :
+            ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
+                MetricDefs::Get(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string),
+                ONE_HOUR_IN_NS, 3)));
+    int64_t ONE_GB = 1024L * 1024L * 1024L;
+    disk_queues_[i]->set_read_size(read_size != nullptr ? read_size :
+            ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
+                MetricDefs::Get(READ_SIZE_METRIC_KEY_TEMPLATE, i_string), ONE_GB, 3)));
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;
       ss << "work-loop(Disk: " << device_name << ", Thread: " << j << ")";
@@ -446,7 +485,7 @@ void DiskQueue::DiskThreadLoop(DiskIoMgr* io_mgr) {
 
     if (range->request_type() == RequestType::READ) {
       ScanRange* scan_range = static_cast<ScanRange*>(range);
-      ReadOutcome outcome = scan_range->DoRead(disk_id_);
+      ReadOutcome outcome = scan_range->DoRead(this, disk_id_);
       worker_context->ReadDone(disk_id_, outcome, scan_range);
     } else {
       DCHECK(range->request_type() == RequestType::WRITE);
diff --git a/be/src/runtime/io/file-reader.h b/be/src/runtime/io/file-reader.h
index 9dbcc31..7d46529 100644
--- a/be/src/runtime/io/file-reader.h
+++ b/be/src/runtime/io/file-reader.h
@@ -27,6 +27,7 @@ namespace impala {
 namespace io {
 
 class DiskIoMgr;
+class DiskQueue;
 class RequestContext;
 class ScanRange;
 
@@ -47,7 +48,9 @@ public:
   /// Reads bytes from given position ('file_offset'). Tries to read
   /// 'bytes_to_read' amount of bytes. 'bytes_read' contains the number of
   /// bytes actually read. 'eof' is set to true when end of file has reached.
-  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+  /// Metrics in 'queue' are updated with the size and latencies of the read
+  /// operations on the underlying file system.
+  virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
       int64_t bytes_to_read, int64_t* bytes_read, bool* eof) = 0;
 
   /// ***Currently only for HDFS***
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index 629c09c..14cf2b5 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -24,6 +24,7 @@
 #include "runtime/io/request-context.h"
 #include "runtime/io/request-ranges.h"
 #include "util/hdfs-util.h"
+#include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
 #include "util/pretty-printer.h"
@@ -71,7 +72,7 @@ Status HdfsFileReader::Open(bool use_file_handle_cache) {
   return Status::OK();
 }
 
-Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
+Status HdfsFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
     int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
@@ -137,7 +138,7 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
 
       // ReadFromPosInternal() might fail due to a bad file handle.
       // If that was the case, allow for a retry to fix it.
-      status = ReadFromPosInternal(hdfs_file, position_in_file,
+      status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
           buffer + *bytes_read, chunk_size, &current_bytes_read);
 
       // Retry if:
@@ -148,15 +149,14 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
         // The error may be due to a bad file handle. Reopen the file handle and retry.
         // Exclude this time from the read timers.
         req_context_read_timer.Stop();
-        RETURN_IF_ERROR(io_mgr->ReopenCachedHdfsFileHandle(hdfs_fs_,
-                scan_range_->file_string(), scan_range_->mtime(),
-                request_context, &borrowed_hdfs_fh));
+        RETURN_IF_ERROR(
+            io_mgr->ReopenCachedHdfsFileHandle(hdfs_fs_, scan_range_->file_string(),
+                scan_range_->mtime(), request_context, &borrowed_hdfs_fh));
         hdfs_file = borrowed_hdfs_fh->file();
-        VLOG_FILE << "Reopening file " << scan_range_->file_string()
-                  << " with mtime " << scan_range_->mtime()
-                  << " offset " << file_offset;
+        VLOG_FILE << "Reopening file " << scan_range_->file_string() << " with mtime "
+                  << scan_range_->mtime() << " offset " << file_offset;
         req_context_read_timer.Start();
-        status = ReadFromPosInternal(hdfs_file, position_in_file,
+        status = ReadFromPosInternal(hdfs_file, queue, position_in_file,
             buffer + *bytes_read, chunk_size, &current_bytes_read);
       }
       if (!status.ok()) {
@@ -185,8 +185,10 @@ Status HdfsFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
   return status;
 }
 
-Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
-    uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
+Status HdfsFileReader::ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* queue,
+    int64_t position_in_file, uint8_t* buffer, int64_t chunk_size, int* bytes_read) {
+  queue->read_size()->Update(chunk_size);
+  ScopedHistogramTimer read_timer(queue->read_latency());
   // For file handles from the cache, any of the below file operations may fail
   // due to a bad file handle.
   if (FLAGS_use_hdfs_pread) {
diff --git a/be/src/runtime/io/hdfs-file-reader.h b/be/src/runtime/io/hdfs-file-reader.h
index 01a0394..1e228e2 100644
--- a/be/src/runtime/io/hdfs-file-reader.h
+++ b/be/src/runtime/io/hdfs-file-reader.h
@@ -35,7 +35,7 @@ public:
   ~HdfsFileReader();
 
   virtual Status Open(bool use_file_handle_cache) override;
-  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+  virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* buffer,
       int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   virtual void Close() override;
   virtual void ResetState() override;
@@ -70,8 +70,9 @@ private:
   /// into 'buffer'. Update 'bytes_read' on success. Returns error status on
   /// failure. When not using HDFS pread, this function will always implicitly
   /// seek to 'position_in_file' if 'hdfs_file' is not at it already.
-  Status ReadFromPosInternal(hdfsFile hdfs_file, int64_t position_in_file,
-      uint8_t* buffer, int64_t chunk_size, int* bytes_read);
+  /// 'disk_queue' metrics are updated based on the operation.
+  Status ReadFromPosInternal(hdfsFile hdfs_file, DiskQueue* disk_queue,
+      int64_t position_in_file, uint8_t* buffer, int64_t chunk_size, int* bytes_read);
 
   void GetHdfsStatistics(hdfsFile hdfs_file);
 
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index acdd2ea..7e612c2 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -18,8 +18,10 @@
 #include <algorithm>
 #include <stdio.h>
 
+#include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/local-file-reader.h"
 #include "runtime/io/request-ranges.h"
+#include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
 
@@ -49,8 +51,8 @@ Status LocalFileReader::Open(bool use_file_handle_cache) {
   return Status::OK();
 }
 
-Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
-    int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
+Status LocalFileReader::ReadFromPos(DiskQueue* queue, int64_t file_offset,
+    uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) {
   DCHECK(scan_range_->read_in_flight());
   DCHECK_GE(bytes_to_read, 0);
   // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
@@ -70,11 +72,14 @@ Status LocalFileReader::ReadFromPos(int64_t file_offset, uint8_t* buffer,
     fclose(file_);
     file_ = nullptr;
     return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
-        Substitute("Could not seek to $0 "
-            "for file: $1: $2", scan_range_->offset(),
-            *scan_range_->file_string(), GetStrErrMsg()));
+        Substitute("Could not seek to $0 for file: $1: $2",
+            scan_range_->offset(), *scan_range_->file_string(), GetStrErrMsg()));
+  }
+  queue->read_size()->Update(bytes_to_read);
+  {
+    ScopedHistogramTimer read_timer(queue->read_latency());
+    *bytes_read = fread(buffer, 1, bytes_to_read, file_);
   }
-  *bytes_read = fread(buffer, 1, bytes_to_read, file_);
   DCHECK_GE(*bytes_read, 0);
   DCHECK_LE(*bytes_read, bytes_to_read);
   if (*bytes_read < bytes_to_read) {
diff --git a/be/src/runtime/io/local-file-reader.h b/be/src/runtime/io/local-file-reader.h
index 14cff3c..8d474e9 100644
--- a/be/src/runtime/io/local-file-reader.h
+++ b/be/src/runtime/io/local-file-reader.h
@@ -25,17 +25,18 @@ namespace io {
 /// File reader class for the local file system.
 /// It uses the standard C APIs from stdio.h
 class LocalFileReader : public FileReader {
-public:
+ public:
   LocalFileReader(ScanRange* scan_range) : FileReader(scan_range) {}
   ~LocalFileReader() {}
 
   virtual Status Open(bool use_file_handle_cache) override;
-  virtual Status ReadFromPos(int64_t file_offset, uint8_t* buffer,
+  virtual Status ReadFromPos(DiskQueue* disk_queue, int64_t file_offset, uint8_t* buffer,
       int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override;
   /// We don't cache files of the local file system.
   virtual void CachedFile(uint8_t** data, int64_t* length) override;
   virtual void Close() override;
-private:
+
+ private:
   /// Points to a C FILE object between calls to Open() and Close(), otherwise nullptr.
   FILE* file_ = nullptr;
 };
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 8567da3..65cc3bd 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -351,8 +351,9 @@ class ScanRange : public RequestRange {
 
   /// Called from a disk I/O thread to read the next buffer of data for this range. The
   /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the
-  /// ID of the disk queue. Caller must not hold 'lock_'.
-  ReadOutcome DoRead(int disk_id);
+  /// ID of the disk queue. 'queue' is updated with the sizes and latencies of reads from
+  /// the underlying filesystem. Caller must not hold 'lock_'.
+  ReadOutcome DoRead(DiskQueue* queue, int disk_id);
 
   /// Cleans up a buffer that was not returned to the client.
   /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -442,8 +443,9 @@ class ScanRange : public RequestRange {
 
   /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'.
   /// If cached data is available, then memcpy() from it instead of actually reading the
-  /// files.
-  Status ReadSubRanges(BufferDescriptor* buffer, bool* eof);
+  /// files. 'queue' is updated with the latencies and sizes of reads from the underlying
+  /// filesystem.
+  Status ReadSubRanges(DiskQueue* queue, BufferDescriptor* buffer, bool* eof);
 
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index c118c90..d9e3423 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -168,7 +168,7 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
   return result;
 }
 
-ReadOutcome ScanRange::DoRead(int disk_id) {
+ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
 
@@ -220,11 +220,11 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
 
     if (sub_ranges_.empty()) {
       DCHECK(cache_.data == nullptr);
-      read_status = file_reader_->ReadFromPos(offset_ + bytes_read_, buffer_desc->buffer_,
-          min(len() - bytes_read_, buffer_desc->buffer_len_),
+      read_status = file_reader_->ReadFromPos(queue, offset_ + bytes_read_,
+          buffer_desc->buffer_, min(len() - bytes_read_, buffer_desc->buffer_len_),
           &buffer_desc->len_, &eof);
     } else {
-      read_status = ReadSubRanges(buffer_desc.get(), &eof);
+      read_status = ReadSubRanges(queue, buffer_desc.get(), &eof);
     }
 
     COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
@@ -266,10 +266,11 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   return eosr ? ReadOutcome::SUCCESS_EOSR : ReadOutcome::SUCCESS_NO_EOSR;
 }
 
-Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) {
+Status ScanRange::ReadSubRanges(
+    DiskQueue* queue, BufferDescriptor* buffer_desc, bool* eof) {
   buffer_desc->len_ = 0;
-  while (buffer_desc->len() < buffer_desc->buffer_len() &&
-      sub_range_pos_.index < sub_ranges_.size()) {
+  while (buffer_desc->len() < buffer_desc->buffer_len()
+      && sub_range_pos_.index < sub_ranges_.size()) {
     SubRange& sub_range = sub_ranges_[sub_range_pos_.index];
     int64_t offset = sub_range.offset + sub_range_pos_.bytes_read;
     int64_t bytes_to_read = min(sub_range.length - sub_range_pos_.bytes_read,
@@ -280,7 +281,7 @@ Status ScanRange::ReadSubRanges(BufferDescriptor* buffer_desc, bool* eof) {
           cache_.data + offset, bytes_to_read);
     } else {
       int64_t current_bytes_read;
-      Status read_status = file_reader_->ReadFromPos(offset,
+      Status read_status = file_reader_->ReadFromPos(queue, offset,
           buffer_desc->buffer_ + buffer_desc->len(), bytes_to_read, &current_bytes_read,
           eof);
       if (!read_status.ok()) return read_status;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index c8286e7..dad65e1 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -51,6 +51,8 @@ Status TestEnv::Init() {
     static_metrics_.reset(new MetricGroup("test-env-static-metrics"));
     ImpaladMetrics::CreateMetrics(static_metrics_.get());
     RETURN_IF_ERROR(RegisterMemoryMetrics(static_metrics_.get(), true, nullptr, nullptr));
+    ImpaladMetrics::CreateMetrics(
+        static_metrics_->GetOrCreateChildGroup("impala-server"));
   }
 
   exec_env_.reset(new ExecEnv);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a61c187..d02eb71 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -418,10 +418,6 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     ERR_load_crypto_strings();
   }
 
-  // Initialize impalad metrics
-  ImpaladMetrics::CreateMetrics(
-      exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));
-
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
 
   // Register the catalog update callback if running in a real cluster as a coordinator.
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index 7aac0f1..26d6bb6 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -20,6 +20,7 @@
 #include "util/hdr-histogram.h"
 #include "util/metrics.h"
 #include "util/spinlock.h"
+#include "util/stopwatch.h"
 
 namespace impala {
 
@@ -68,4 +69,16 @@ class HistogramMetric : public Metric {
 
   DISALLOW_COPY_AND_ASSIGN(HistogramMetric);
 };
-}
+
+// Utility class to update histogram with elapsed time in code block.
+class ScopedHistogramTimer {
+ public:
+  ScopedHistogramTimer(HistogramMetric* metric) : metric_(metric) { sw_.Start(); }
+
+  ~ScopedHistogramTimer() { metric_->Update(sw_.ElapsedTime()); }
+
+ private:
+  HistogramMetric* const metric_;
+  MonotonicStopWatch sw_;
+};
+} // namespace impala
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 4ced62d..1b16bd6 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -137,73 +137,74 @@ const char* ImpaladMetricKeys::HEDGED_READ_OPS_WIN =
 // These are created by impala-server during startup.
 // =======
 // Counters
-IntCounter* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED = NULL;
-IntGauge* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING = NULL;
-IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES = NULL;
-IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS = NULL;
-IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = NULL;
-IntCounter* ImpaladMetrics::NUM_QUERIES_EXPIRED = NULL;
-IntCounter* ImpaladMetrics::NUM_QUERIES_SPILLED = NULL;
-IntCounter* ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID = NULL;
-IntCounter* ImpaladMetrics::NUM_RANGES_PROCESSED = NULL;
-IntCounter* ImpaladMetrics::NUM_SESSIONS_EXPIRED = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = NULL;
-IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = NULL;
-IntCounter* ImpaladMetrics::HEDGED_READ_OPS = NULL;
-IntCounter* ImpaladMetrics::HEDGED_READ_OPS_WIN = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_EVICTION_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_HIT_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_SUCCESS_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_MISS_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_REQUEST_COUNT = NULL;
-IntCounter* ImpaladMetrics::CATALOG_CACHE_TOTAL_LOAD_TIME = NULL;
+IntCounter* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED = nullptr;
+IntGauge* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING = nullptr;
+IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES = nullptr;
+IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS = nullptr;
+IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = nullptr;
+IntCounter* ImpaladMetrics::NUM_QUERIES_EXPIRED = nullptr;
+IntCounter* ImpaladMetrics::NUM_QUERIES_SPILLED = nullptr;
+IntCounter* ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID = nullptr;
+IntCounter* ImpaladMetrics::NUM_RANGES_PROCESSED = nullptr;
+IntCounter* ImpaladMetrics::NUM_SESSIONS_EXPIRED = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_BYTES_WRITTEN = nullptr;
+IntCounter* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED = nullptr;
+IntCounter* ImpaladMetrics::HEDGED_READ_OPS = nullptr;
+IntCounter* ImpaladMetrics::HEDGED_READ_OPS_WIN = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_EVICTION_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_HIT_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_SUCCESS_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_MISS_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_REQUEST_COUNT = nullptr;
+IntCounter* ImpaladMetrics::CATALOG_CACHE_TOTAL_LOAD_TIME = nullptr;
 
 // Gauges
-IntGauge* ImpaladMetrics::CATALOG_NUM_DBS = NULL;
-IntGauge* ImpaladMetrics::CATALOG_NUM_TABLES = NULL;
-IntGauge* ImpaladMetrics::CATALOG_VERSION = NULL;
-IntGauge* ImpaladMetrics::CATALOG_TOPIC_VERSION = NULL;
-IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = NULL;
-IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_NUM_BUFFERS = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_NUM_OPEN_FILES = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = NULL;
-IntGauge* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = NULL;
-IntGauge* ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT = NULL;
-IntGauge* ImpaladMetrics::NUM_QUERIES_REGISTERED = NULL;
-IntGauge* ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS = NULL;
-IntGauge* ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES = NULL;
-DoubleGauge* ImpaladMetrics::CATALOG_CACHE_AVG_LOAD_TIME = NULL;
-DoubleGauge* ImpaladMetrics::CATALOG_CACHE_HIT_RATE = NULL;
-DoubleGauge* ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_RATE = NULL;
-DoubleGauge* ImpaladMetrics::CATALOG_CACHE_MISS_RATE = NULL;
+IntGauge* ImpaladMetrics::CATALOG_NUM_DBS = nullptr;
+IntGauge* ImpaladMetrics::CATALOG_NUM_TABLES = nullptr;
+IntGauge* ImpaladMetrics::CATALOG_VERSION = nullptr;
+IntGauge* ImpaladMetrics::CATALOG_TOPIC_VERSION = nullptr;
+IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = nullptr;
+IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = nullptr;
+MetricGroup* ImpaladMetrics::IO_MGR_METRICS = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_NUM_BUFFERS = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_NUM_OPEN_FILES = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = nullptr;
+IntGauge* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = nullptr;
+IntGauge* ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT = nullptr;
+IntGauge* ImpaladMetrics::NUM_QUERIES_REGISTERED = nullptr;
+IntGauge* ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS = nullptr;
+IntGauge* ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES = nullptr;
+DoubleGauge* ImpaladMetrics::CATALOG_CACHE_AVG_LOAD_TIME = nullptr;
+DoubleGauge* ImpaladMetrics::CATALOG_CACHE_HIT_RATE = nullptr;
+DoubleGauge* ImpaladMetrics::CATALOG_CACHE_LOAD_EXCEPTION_RATE = nullptr;
+DoubleGauge* ImpaladMetrics::CATALOG_CACHE_MISS_RATE = nullptr;
 
 // Properties
-BooleanProperty* ImpaladMetrics::CATALOG_READY = NULL;
-BooleanProperty* ImpaladMetrics::IMPALA_SERVER_READY = NULL;
-StringProperty* ImpaladMetrics::IMPALA_SERVER_VERSION = NULL;
-StringProperty* ImpaladMetrics::CATALOG_SERVICE_ID = NULL;
+BooleanProperty* ImpaladMetrics::CATALOG_READY = nullptr;
+BooleanProperty* ImpaladMetrics::IMPALA_SERVER_READY = nullptr;
+StringProperty* ImpaladMetrics::IMPALA_SERVER_VERSION = nullptr;
+StringProperty* ImpaladMetrics::CATALOG_SERVICE_ID = nullptr;
 
 // Histograms
-HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = NULL;
-HistogramMetric* ImpaladMetrics::DDL_DURATIONS = NULL;
+HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr;
+HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr;
 
 // Other
 StatsMetric<uint64_t, StatsType::MEAN>*
-ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO = NULL;
+ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO = nullptr;
 
 void ImpaladMetrics::InitCatalogMetrics(MetricGroup* m) {
   // Initialize catalog metrics
@@ -292,43 +293,44 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
 
   // Initialize IO mgr metrics
-  IO_MGR_NUM_OPEN_FILES = m->AddGauge(
+  IO_MGR_METRICS = m->GetOrCreateChildGroup("io-mgr");
+  IO_MGR_NUM_OPEN_FILES = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
-  IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
+  IO_MGR_NUM_CACHED_FILE_HANDLES = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
-  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(
+  IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = m->AddGauge(
+  IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = m->AddGauge(
+  IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT, 0);
 
-  IO_MGR_CACHED_FILE_HANDLES_REOPENED = m->AddCounter(
+  IO_MGR_CACHED_FILE_HANDLES_REOPENED = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_REOPENED, 0);
 
-  IO_MGR_BYTES_READ = m->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
-  IO_MGR_LOCAL_BYTES_READ = m->AddCounter(
+  IO_MGR_BYTES_READ = IO_MGR_METRICS->AddCounter(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0);
+  IO_MGR_LOCAL_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0);
-  IO_MGR_CACHED_BYTES_READ = m->AddCounter(
+  IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0);
-  IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddCounter(
+  IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0);
-  IO_MGR_BYTES_WRITTEN = m->AddCounter(
+  IO_MGR_BYTES_WRITTEN = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0);
 
-  IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = m->AddCounter(
+  IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES, 0);
-  IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = m->AddCounter(
+  IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES, 0);
-  IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = m->AddGauge(
+  IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES = IO_MGR_METRICS->AddGauge(
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_TOTAL_BYTES, 0);
-  IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = m->AddCounter(
+  IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES = IO_MGR_METRICS->AddCounter(
       ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_DROPPED_BYTES, 0);
 
   IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO =
-      StatsMetric<uint64_t, StatsType::MEAN>::CreateAndRegister(m,
+      StatsMetric<uint64_t, StatsType::MEAN>::CreateAndRegister(IO_MGR_METRICS,
       ImpaladMetricKeys::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO);
 
   InitCatalogMetrics(m);
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 6eeadac..4c9146a 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -259,6 +259,7 @@ class ImpaladMetrics {
   static DoubleGauge* CATALOG_CACHE_MISS_RATE;
   static IntGauge* IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS;
   static IntGauge* IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS;
+  static MetricGroup* IO_MGR_METRICS;
   static IntGauge* IO_MGR_NUM_BUFFERS;
   static IntGauge* IO_MGR_NUM_OPEN_FILES;
   static IntGauge* IO_MGR_NUM_UNUSED_BUFFERS;
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 6f75bcf..c5b6378 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -560,6 +560,36 @@
     "key": "impala-server.io-mgr.total-bytes"
   },
   {
+    "description": "IO Manager device name.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Queue Device Name",
+    "units": "NONE",
+    "kind": "PROPERTY",
+    "key": "impala-server.io-mgr.queue-$0.device-name"
+  },
+  {
+    "description": "Histogram of read operation times on disk.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Read Latency Histogram",
+    "units": "TIME_NS",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.io-mgr.queue-$0.read-latency"
+  },
+  {
+    "description": "Histogram of read operation sizes on disk.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Read Size Histogram",
+    "units": "BYTES",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.io-mgr.queue-$0.read-size"
+  },
+  {
     "description": "The number of HDFS files currently open for writing.",
     "contexts": [
       "IMPALAD"