You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2020/06/29 15:44:41 UTC

[impala] branch master updated: IMPALA-9829: Add Write Metrics for Spilling

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c1b63  IMPALA-9829: Add Write Metrics for Spilling
94c1b63 is described below

commit 94c1b6354b9da14db7ea9e59f09fcb06bae2acfe
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Mon Jun 15 18:14:25 2020 -0700

    IMPALA-9829: Add Write Metrics for Spilling
    
    Currently, we have read metrics for spilling, in this patch, we add
    support for write metrics. The new metrics could be useful to measure
    the write operations and target performance issues when involving in
    spilling to remote disks(S3) (IMPALA-9828).
    
    The metrics added record the information includes:
    1. write latency of each write operation to the disk, metric kind:
    HistogramMetric, unit: nanosecond.
    2. write size of each write operation to the disk, metric kind:
    HistogramMetric, unit: Bytes.
    3. number of write IO errors when writing to the disk, metric kind:
    IntCounter.
    
    Testing:
     * added DiskIoMgrTest.MetricsOfWriteSizeAndLatency
     * added DiskIoMgrTest.MetricsOfWriteIoError
    Ran unit test disk-io-mgr-test and pre-commit test
    
    Change-Id: I152b9c5339cedabe33f8873a2bbf651aa5dbb914
    Reviewed-on: http://gerrit.cloudera.org:8080/16083
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr-internal.h |  24 ++++++
 be/src/runtime/io/disk-io-mgr-test.cc    | 132 ++++++++++++++++++++++++++++++-
 be/src/runtime/io/disk-io-mgr.cc         |  80 +++++++++++++++----
 be/src/runtime/io/disk-io-mgr.h          |   2 +
 be/src/util/histogram-metric.h           |   5 ++
 common/thrift/metrics.json               |  30 +++++++
 6 files changed, 258 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 426e406..351de68 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -101,9 +101,24 @@ class DiskQueue {
     DCHECK(read_size_ == nullptr);
     read_size_ = read_size;
   }
+  void set_write_latency(HistogramMetric* write_latency) {
+    DCHECK(write_latency_ == nullptr);
+    write_latency_ = write_latency;
+  }
+  void set_write_size(HistogramMetric* write_size) {
+    DCHECK(write_size_ == nullptr);
+    write_size_ = write_size;
+  }
+  void set_write_io_err(IntCounter* write_io_err) {
+    DCHECK(write_io_err_ == nullptr);
+    write_io_err_ = write_io_err;
+  }
 
   HistogramMetric* read_latency() const { return read_latency_; }
   HistogramMetric* read_size() const { return read_size_; }
+  HistogramMetric* write_latency() const { return write_latency_; }
+  HistogramMetric* write_size() const { return write_size_; }
+  IntCounter* write_io_err() const { return write_io_err_; }
 
  private:
   /// Called from the disk thread to get the next range to process. Wait until a scan
@@ -121,6 +136,15 @@ class DiskQueue {
   /// Metric that tracks read size for this queue.
   HistogramMetric* read_size_ = nullptr;
 
+  /// Metric that tracks write latency for this queue.
+  HistogramMetric* write_latency_ = nullptr;
+
+  /// Metric that tracks write size for this queue.
+  HistogramMetric* write_size_ = nullptr;
+
+  /// Metric that tracks write io errors for this queue.
+  IntCounter* write_io_err_ = nullptr;
+
   /// Lock that protects below members.
   std::mutex lock_;
 
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index f549e19..87eea08 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -23,9 +23,10 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/io/cache-reader-test-stub.h"
-#include "runtime/io/local-file-system-with-fault-injection.h"
+#include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/local-file-system-with-fault-injection.h"
 #include "runtime/io/request-context.h"
 #include "runtime/test-env.h"
 #include "testutil/gtest-util.h"
@@ -33,6 +34,8 @@
 #include "testutil/scoped-flag-setter.h"
 #include "util/condition-variable.h"
 #include "util/debug-util.h"
+#include "util/filesystem-util.h"
+#include "util/histogram-metric.h"
 #include "util/thread.h"
 #include "util/time.h"
 
@@ -1640,5 +1643,132 @@ TEST_F(DiskIoMgrTest, BufferSizeSelection) {
   EXPECT_EQ(vector<int64_t>({MIN_BUFFER_SIZE}),
       io_mgr.ChooseBufferSizes(MIN_BUFFER_SIZE - 7, 3 * MAX_BUFFER_SIZE));
 }
+
+// Issue a number of writes then read if the metrics record
+// the write operations.
+TEST_F(DiskIoMgrTest, MetricsOfWriteSizeAndLatency) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  num_ranges_written_ = 0;
+  string tmp_file = "/tmp/disk_io_mgr_test.txt";
+  int num_ranges = 100;
+  int64_t file_size = 1024 * 1024;
+  int64_t cur_offset = 0;
+  int num_disks = 5;
+  int success = CreateTempFile(tmp_file.c_str(), file_size);
+  if (success != 0) {
+    LOG(ERROR) << "Error creating temp file " << tmp_file.c_str() << " of size "
+               << file_size;
+    EXPECT_TRUE(false);
+  }
+
+  // Reset the Metric if it exists.
+  for (int i = 0; i < num_disks; i++) {
+    string key_prefix = "impala-server.io-mgr.queue-";
+    string write_size_postfix = ".write-size";
+    string write_latency_postfix = ".write-latency";
+    string i_str = std::to_string(i);
+    auto write_size_org =
+        ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+            key_prefix + i_str + write_size_postfix);
+    auto write_latency_org =
+        ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+            key_prefix + i_str + write_latency_postfix);
+    if (write_size_org != nullptr) write_size_org->Reset();
+    if (write_latency_org != nullptr) write_latency_org->Reset();
+  }
+
+  WriteRange::WriteDoneCallback callback = [=](const Status& status) {
+    lock_guard<mutex> l(written_mutex_);
+    ++num_ranges_written_;
+    if (num_ranges_written_ == num_ranges) writes_done_.NotifyOne();
+  };
+
+  // Issue a number of writes to the disks.
+  ObjectPool tmp_pool;
+  DiskIoMgr io_mgr(num_disks, 1, 1, 1, 10);
+  ASSERT_OK(io_mgr.Init());
+  unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
+  for (int i = 0; i < num_ranges; ++i) {
+    int32_t* data = tmp_pool.Add(new int32_t);
+    *data = rand();
+    WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+    *new_range =
+        tmp_pool.Add(new WriteRange(tmp_file, cur_offset, i % num_disks, callback));
+    (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+    cur_offset += sizeof(int32_t);
+    ASSERT_OK(writer->AddWriteRange(*new_range));
+  }
+  {
+    unique_lock<mutex> lock(written_mutex_);
+    while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock);
+  }
+
+  // Check the count and max/min of the histogram metric.
+  size_t write_size_len = sizeof(int32_t);
+  auto exam_fuc = [&](HistogramMetric* metric, const string& keyname) {
+    uint64_t total_cnt = metric->TotalCount();
+    uint64_t min_value = metric->MinValue();
+    uint64_t max_value = metric->MaxValue();
+    // The count should be added by num_ranges/num_disks per disk.
+    EXPECT_EQ(total_cnt, num_ranges / num_disks);
+    // Check if the min and max of write size are the same as the written len.
+    if (keyname == "write_size") {
+      EXPECT_EQ(min_value, write_size_len);
+      EXPECT_EQ(max_value, write_size_len);
+    }
+  };
+
+  for (int i = 0; i < num_disks; i++) {
+    auto write_size = io_mgr.disk_queues_[i]->write_size();
+    auto write_latency = io_mgr.disk_queues_[i]->write_latency();
+    exam_fuc(write_size, "write_size");
+    exam_fuc(write_latency, "write_latency");
+  }
+
+  num_ranges_written_ = 0;
+  io_mgr.UnregisterContext(writer.get());
+}
+
+// Issue a writing operation to a non-existent tmp file path.
+// Test if the write IO errors can be recorded.
+TEST_F(DiskIoMgrTest, MetricsOfWriteIoError) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  num_ranges_written_ = 0;
+  string tmp_file = "/non-existent/file.txt";
+
+  // Reset the Metric if it exists.
+  auto write_io_err = ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<IntCounter>(
+      "impala-server.io-mgr.queue-0.write-io-error");
+  if (write_io_err != nullptr) write_io_err->SetValue(0);
+
+  vector<string> tmp_path;
+  tmp_path.push_back(tmp_file);
+  // Remove the path in case it exists.
+  Status rm_status = FileSystemUtil::RemovePaths(tmp_path);
+  ObjectPool tmp_pool;
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
+  ASSERT_OK(io_mgr.Init());
+  unique_ptr<RequestContext> writer = io_mgr.RegisterContext();
+  int32_t* data = tmp_pool.Add(new int32_t);
+  *data = rand();
+  WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+  WriteRange::WriteDoneCallback callback = [=](const Status& status) {
+    ASSERT_EQ(TErrorCode::DISK_IO_ERROR, status.code());
+    num_ranges_written_ = 1;
+    writes_done_.NotifyOne();
+  };
+  *new_range = tmp_pool.Add(new WriteRange(tmp_file, 0, 0, callback));
+  (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+  EXPECT_OK(writer->AddWriteRange(*new_range));
+  {
+    unique_lock<mutex> lock(written_mutex_);
+    while (num_ranges_written_ < 1) writes_done_.Wait(lock);
+  }
+  // One IO Error should be added to the metrics counter.
+  EXPECT_EQ(write_io_err->GetValue(), 1);
+
+  num_ranges_written_ = 0;
+  io_mgr.UnregisterContext(writer.get());
+}
 }
 }
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 31321c7..da35039 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -162,6 +162,12 @@ static const char* READ_LATENCY_METRIC_KEY_TEMPLATE =
     "impala-server.io-mgr.queue-$0.read-latency";
 static const char* READ_SIZE_METRIC_KEY_TEMPLATE =
     "impala-server.io-mgr.queue-$0.read-size";
+static const char* WRITE_LATENCY_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.write-latency";
+static const char* WRITE_SIZE_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.write-size";
+static const char* WRITE_IO_ERR_METRIC_KEY_TEMPLATE =
+    "impala-server.io-mgr.queue-$0.write-io-error";
 
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
@@ -302,9 +308,13 @@ Status DiskIoMgr::Init() {
       ImpaladMetrics::IO_MGR_METRICS->AddProperty<string>(
           DEVICE_NAME_METRIC_KEY_TEMPLATE, device_name, i_string);
     }
-    int64_t ONE_HOUR_IN_NS = 60L * 60L * NANOS_PER_SEC;
+
     HistogramMetric* read_latency = nullptr;
     HistogramMetric* read_size = nullptr;
+    HistogramMetric* write_latency = nullptr;
+    HistogramMetric* write_size = nullptr;
+    IntCounter* write_io_err = nullptr;
+
     if (TestInfo::is_test()) {
       read_latency =
         ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
@@ -312,15 +322,47 @@ Status DiskIoMgr::Init() {
       read_size =
         ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
             Substitute(READ_SIZE_METRIC_KEY_TEMPLATE, i_string));
+      write_latency =
+          ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+              Substitute(WRITE_LATENCY_METRIC_KEY_TEMPLATE, i_string));
+      write_size = ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>(
+          Substitute(WRITE_SIZE_METRIC_KEY_TEMPLATE, i_string));
+      write_io_err = ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<IntCounter>(
+          Substitute(WRITE_IO_ERR_METRIC_KEY_TEMPLATE, i_string));
     }
-    disk_queues_[i]->set_read_latency(read_latency != nullptr ? read_latency :
-            ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
-                MetricDefs::Get(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string),
-                ONE_HOUR_IN_NS, 3)));
+
+    int64_t ONE_HOUR_IN_NS = 60L * 60L * NANOS_PER_SEC;
     int64_t ONE_GB = 1024L * 1024L * 1024L;
-    disk_queues_[i]->set_read_size(read_size != nullptr ? read_size :
-            ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
-                MetricDefs::Get(READ_SIZE_METRIC_KEY_TEMPLATE, i_string), ONE_GB, 3)));
+
+    if (read_latency == nullptr) {
+      read_latency = ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(
+          new HistogramMetric(MetricDefs::Get(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string),
+              ONE_HOUR_IN_NS, 3));
+    }
+    if (read_size == nullptr) {
+      read_size = ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
+          MetricDefs::Get(READ_SIZE_METRIC_KEY_TEMPLATE, i_string), ONE_GB, 3));
+    }
+    if (write_latency == nullptr) {
+      write_latency = ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
+          MetricDefs::Get(WRITE_LATENCY_METRIC_KEY_TEMPLATE, i_string), ONE_HOUR_IN_NS,
+          3));
+    }
+    if (write_size == nullptr) {
+      write_size = ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric(
+          MetricDefs::Get(WRITE_SIZE_METRIC_KEY_TEMPLATE, i_string), ONE_GB, 3));
+    }
+    if (write_io_err == nullptr) {
+      write_io_err = ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(
+          new IntCounter(MetricDefs::Get(WRITE_IO_ERR_METRIC_KEY_TEMPLATE, i_string), 0));
+    }
+
+    disk_queues_[i]->set_read_latency(read_latency);
+    disk_queues_[i]->set_read_size(read_size);
+    disk_queues_[i]->set_write_latency(write_latency);
+    disk_queues_[i]->set_write_size(write_size);
+    disk_queues_[i]->set_write_io_err(write_io_err);
+
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;
       ss << "work-loop(Disk: " << device_name << ", Thread: " << j << ")";
@@ -514,16 +556,26 @@ void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
   Status ret_status = Status::OK();
   FILE* file_handle = nullptr;
   Status close_status = Status::OK();
-  ret_status = local_file_system_->OpenForWrite(write_range->file(), O_RDWR | O_CREAT,
-      S_IRUSR | S_IWUSR, &file_handle);
-  if (!ret_status.ok()) goto end;
+  DiskQueue* queue = disk_queues_[write_range->disk_id()];
 
-  ret_status = WriteRangeHelper(file_handle, write_range);
+  {
+    ScopedHistogramTimer write_timer(queue->write_latency());
+    ret_status = local_file_system_->OpenForWrite(
+        write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR, &file_handle);
+    if (!ret_status.ok()) goto end;
 
-  close_status = local_file_system_->Fclose(file_handle, write_range);
-  if (ret_status.ok() && !close_status.ok()) ret_status = close_status;
+    ret_status = WriteRangeHelper(file_handle, write_range);
+
+    close_status = local_file_system_->Fclose(file_handle, write_range);
+    if (ret_status.ok() && !close_status.ok()) ret_status = close_status;
+  }
 
 end:
+  if (ret_status.ok()) {
+    queue->write_size()->Update(write_range->len());
+  } else {
+    queue->write_io_err()->Increment(1);
+  }
   writer_context->WriteDone(write_range, ret_status);
 }
 
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 165f769..b26f910 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -380,6 +380,8 @@ class DiskIoMgr : public CacheLineAligned {
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_BufferSizeSelection_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
+  friend class DiskIoMgrTest_MetricsOfWriteSizeAndLatency_Test;
+  friend class DiskIoMgrTest_MetricsOfWriteIoError_Test;
 
   /////////////////////////////////////////
   /// BEGIN: private members that are accessed by other io:: classes
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index 2e6ca7f..f1231b2 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -46,6 +46,11 @@ class HistogramMetric : public Metric {
     histogram_->Increment(val);
   }
 
+  uint64_t MinValue() const { return histogram_->MinValue(); }
+  uint64_t MaxValue() const { return histogram_->MaxValue(); }
+  uint64_t TotalCount() const { return histogram_->TotalCount(); }
+  uint64_t TotalSum() const { return histogram_->TotalSum(); }
+
   /// Reset the histogram by removing all previous entries.
   void Reset();
 
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 932b2c3..cc24166 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -640,6 +640,36 @@
     "key": "impala-server.io-mgr.queue-$0.read-size"
   },
   {
+    "description": "Histogram of write operation times on disk.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Write Latency Histogram",
+    "units": "TIME_NS",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.io-mgr.queue-$0.write-latency"
+  },
+  {
+    "description": "Histogram of write operation sizes on disk.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Write Size Histogram",
+    "units": "BYTES",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.io-mgr.queue-$0.write-size"
+  },
+  {
+    "description": "The number of write io errors on disk.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Server Io Mgr Write IO Error Count",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala-server.io-mgr.queue-$0.write-io-error"
+  },
+  {
     "description": "The number of HDFS files currently open for writing.",
     "contexts": [
       "IMPALAD"