You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/05/01 21:39:29 UTC

[arrow] branch master updated: ARROW-8562: [C++] IO: Parameterize I/O Coalescing using S3 metrics

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a791c5  ARROW-8562: [C++] IO: Parameterize I/O Coalescing using S3 metrics
8a791c5 is described below

commit 8a791c52433b6fa00686cf3a0e5924b72cfb805e
Author: mayuropensource <ma...@gmail.com>
AuthorDate: Fri May 1 16:38:53 2020 -0500

    ARROW-8562: [C++] IO: Parameterize I/O Coalescing using S3 metrics
    
    _(Recreating the PR from a clean repo, sorry about earlier PR which was not cleanly merged)._
    
    **JIRA:** https://issues.apache.org/jira/browse/ARROW-8562
    
    This change is not actually used until #6744 (@lidavidm) is pushed, however, it doesn't need to wait for the other pull request to be merged.
    
    **Description:**
    The adaptive I/O coalescing algorithm uses two parameters:
    
        max_io_gap or hole_size_limit: Max I/O gap/hole size in bytes
        ideal_request_size or range_size_limit: Ideal I/O Request size in bytes
    
    These parameters can be derived from S3 metrics as described below:
    
    In an S3 compatible storage, there are two main metrics:
    
        Seek-time or Time-To-First-Byte (TTFB) in seconds: call setup latency of a new S3 request
    
        Transfer Bandwidth (BW) for data in bytes/sec
    
        Computing max_io_gap or hole_size_limit:
    
    max_io_gap = TTFB * BW
    
    This is also called Bandwidth-Delay-Product (BDP).
    
    Two byte ranges that have a gap can still be mapped to the same read if the gap is less than the bandwidth-delay product [TTFB * TransferBandwidth], i.e. if the Time-To-First-Byte (or call setup latency of a new S3 request) is expected to be greater than just reading and discarding the extra bytes on an existing HTTP request.
    
        Computing ideal_request_size or range_size_limit:
    
    We want to have high bandwidth utilization per S3 connections, i.e. transfer large amounts of data to amortize the seek overhead.
    But, we also want to leverage parallelism by slicing very large IO chunks. We define two more config parameters with suggested default values to control the slice size and seek to balance the two effects with the goal of maximizing net data load performance.
    
    BW_util (ideal bandwidth utilization):
    This means what fraction of per connection bandwidth should be utilized to maximize net data load.
    A good default value is 90% or 0.9.
    
    MAX_IDEAL_REQUEST_SIZE:
    This means what is the maximum single request size (in bytes) to maximize net data load.
    A good default value is 64 MiB.
    
    The amount of data that needs to be transferred in a single S3 get_object request to achieve effective bandwidth eff_BW = BW_util * BW is as follows:
    eff_BW = ideal_request_size / (TTFB + ideal_request_size / BW)
    
    Substituting TTFB = max_io_gap / BW and eff_BW = BW_util * BW, we get the following result:
    ideal_request_size = max_io_gap * BW_util / (1 - BW_util)
    
    Applying the MAX_IDEAL_REQUEST_SIZE, we get the following:
    ideal_request_size = min(MAX_IDEAL_REQUEST_SIZE, max_io_gap * BW_util / (1 - BW_util))
    
    Closes #7022 from mayuropensource/ARROW-8562__parameterize_io_coalescing_with_s3_metrics
    
    Authored-by: mayuropensource <ma...@gmail.com>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/io/caching.cc     | 85 +++++++++++++++++++++++++++++++++++++++++
 cpp/src/arrow/io/caching.h      | 27 +++++++++++++
 cpp/src/arrow/io/memory_test.cc | 24 ++++++++++++
 3 files changed, 136 insertions(+)

diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index ec41845..6a2a19a 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <algorithm>
+#include <cmath>
 #include <utility>
 
 #include "arrow/buffer.h"
@@ -33,6 +34,90 @@ CacheOptions CacheOptions::Defaults() {
                       internal::ReadRangeCache::kDefaultRangeSizeLimit};
 }
 
+CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_millis,
+                                                  int64_t transfer_bandwidth_mib_per_sec,
+                                                  double ideal_bandwidth_utilization_frac,
+                                                  int64_t max_ideal_request_size_mib) {
+  //
+  // The I/O coalescing algorithm uses two parameters:
+  //   1. hole_size_limit (a.k.a max_io_gap): Max I/O gap/hole size in bytes
+  //   2. range_size_limit (a.k.a ideal_request_size): Ideal I/O Request size in bytes
+  //
+  // These parameters can be derived from network metrics (e.g. S3) as described below:
+  //
+  // In an S3 compatible storage, there are two main metrics:
+  //   1. Seek-time or Time-To-First-Byte (TTFB) in seconds: call setup latency of a new
+  //      S3 request
+  //   2. Transfer Bandwidth (BW) for data in bytes/sec
+  //
+  // 1. Computing hole_size_limit:
+  //
+  //   hole_size_limit = TTFB * BW
+  //
+  //   This is also called Bandwidth-Delay-Product (BDP).
+  //   Two byte ranges that have a gap can still be mapped to the same read
+  //   if the gap is less than the bandwidth-delay product [TTFB * TransferBandwidth],
+  //   i.e. if the Time-To-First-Byte (or call setup latency of a new S3 request) is
+  //   expected to be greater than just reading and discarding the extra bytes on an
+  //   existing HTTP request.
+  //
+  // 2. Computing range_size_limit:
+  //
+  //   We want to have high bandwidth utilization per S3 connections,
+  //   i.e. transfer large amounts of data to amortize the seek overhead.
+  //   But, we also want to leverage parallelism by slicing very large IO chunks.
+  //   We define two more config parameters with suggested default values to control
+  //   the slice size and seek to balance the two effects with the goal of maximizing
+  //   net data load performance.
+  //
+  //   BW_util_frac (ideal bandwidth utilization): Transfer bandwidth utilization fraction
+  //     (per connection) to maximize the net data load. 90% is a good default number for
+  //     an effective transfer bandwidth.
+  //
+  //   MAX_IDEAL_REQUEST_SIZE: The maximum single data request size (in MiB) to maximize
+  //     the net data load. 64 MiB is a good default number for the ideal request size.
+  //
+  //   The amount of data that needs to be transferred in a single S3 get_object
+  //   request to achieve effective bandwidth eff_BW = BW_util_frac * BW is as follows:
+  //     eff_BW = range_size_limit / (TTFB + range_size_limit / BW)
+  //
+  //   Substituting TTFB = hole_size_limit / BW and eff_BW = BW_util_frac * BW, we get the
+  //   following result:
+  //     range_size_limit = hole_size_limit * BW_util_frac / (1 - BW_util_frac)
+  //
+  //   Applying the MAX_IDEAL_REQUEST_SIZE, we get the following:
+  //     range_size_limit = min(MAX_IDEAL_REQUEST_SIZE,
+  //                            hole_size_limit * BW_util_frac / (1 - BW_util_frac))
+  //
+  DCHECK_GT(time_to_first_byte_millis, 0) << "TTFB must be > 0";
+  DCHECK_GT(transfer_bandwidth_mib_per_sec, 0) << "Transfer bandwidth must be > 0";
+  DCHECK_GT(ideal_bandwidth_utilization_frac, 0)
+      << "Ideal bandwidth utilization fraction must be > 0";
+  DCHECK_LT(ideal_bandwidth_utilization_frac, 1.0)
+      << "Ideal bandwidth utilization fraction must be < 1";
+  DCHECK_GT(max_ideal_request_size_mib, 0) << "Max Ideal request size must be > 0";
+
+  const double time_to_first_byte_sec = time_to_first_byte_millis / 1000.0;
+  const int64_t transfer_bandwidth_bytes_per_sec =
+      transfer_bandwidth_mib_per_sec * 1024 * 1024;
+  const int64_t max_ideal_request_size_bytes = max_ideal_request_size_mib * 1024 * 1024;
+
+  // hole_size_limit = TTFB * BW
+  const auto hole_size_limit = static_cast<int64_t>(
+      std::round(time_to_first_byte_sec * transfer_bandwidth_bytes_per_sec));
+  DCHECK_GT(hole_size_limit, 0) << "Computed hole_size_limit must be > 0";
+
+  // range_size_limit = min(MAX_IDEAL_REQUEST_SIZE,
+  //                        hole_size_limit * BW_util_frac / (1 - BW_util_frac))
+  const int64_t range_size_limit = std::min(
+      max_ideal_request_size_bytes,
+      static_cast<int64_t>(std::round(hole_size_limit * ideal_bandwidth_utilization_frac /
+                                      (1 - ideal_bandwidth_utilization_frac))));
+  DCHECK_GT(range_size_limit, 0) << "Computed range_size_limit must be > 0";
+
+  return {hole_size_limit, range_size_limit};
+}
+
 namespace internal {
 
 struct RangeCacheEntry {
diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h
index adc5dca..bdb02cb 100644
--- a/cpp/src/arrow/io/caching.h
+++ b/cpp/src/arrow/io/caching.h
@@ -29,6 +29,9 @@ namespace arrow {
 namespace io {
 
 struct ARROW_EXPORT CacheOptions {
+  static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
+  static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;
+
   /// /brief The maximum distance in bytes between two consecutive
   ///   ranges; beyond this value, ranges are not combined
   int64_t hole_size_limit;
@@ -37,6 +40,30 @@ struct ARROW_EXPORT CacheOptions {
   ///   size greater than this, they are not combined
   int64_t range_size_limit;
 
+  bool operator==(const CacheOptions& other) const {
+    return hole_size_limit == other.hole_size_limit &&
+           range_size_limit == other.range_size_limit;
+  }
+
+  /// \brief Construct CacheOptions from network storage metrics (e.g. S3).
+  ///
+  /// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in
+  ///   milliseconds, also called call setup latency of a new S3 request.
+  ///   The value is a positive integer.
+  /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec.
+  ///   The value is a positive integer.
+  /// \param[in] ideal_bandwidth_utilization_frac Transfer bandwidth utilization fraction
+  ///   (per connection) to maximize the net data load.
+  ///   The value is a positive double precision number less than 1.
+  /// \param[in] max_ideal_request_size_mib The maximum single data request size (in MiB)
+  ///   to maximize the net data load.
+  ///   The value is a positive integer.
+  /// \return A new instance of CacheOptions.
+  static CacheOptions MakeFromNetworkMetrics(
+      int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
+      double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac,
+      int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);
+
   static CacheOptions Defaults();
 };
 
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index feda6c9..39e3516 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <chrono>
+#include <cmath>
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
@@ -487,5 +488,28 @@ TEST(RangeReadCache, Basics) {
   ASSERT_RAISES(Invalid, cache.Read({25, 2}));
 }
 
+TEST(CacheOptions, Basics) {
+  auto check = [](const CacheOptions actual, const double expected_hole_size_limit_MiB,
+                  const double expected_range_size_limit_MiB) -> void {
+    const CacheOptions expected = {
+        static_cast<int64_t>(std::round(expected_hole_size_limit_MiB * 1024 * 1024)),
+        static_cast<int64_t>(std::round(expected_range_size_limit_MiB * 1024 * 1024))};
+    ASSERT_EQ(actual, expected);
+  };
+
+  // Test: normal usage.
+  // TTFB = 5 ms, BW = 500 MiB/s,
+  // we expect hole_size_limit = 2.5 MiB, and range_size_limit = 22.5 MiB
+  check(CacheOptions::MakeFromNetworkMetrics(5, 500), 2.5, 22.5);
+  // Test: custom bandwidth utilization.
+  // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%,
+  // we expect a change in range_size_limit = 7.5 MiB.
+  check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75), 2.5, 7.5);
+  // Test: custom max_ideal_request_size, range_size_limit gets capped.
+  // TTFB = 5 ms, BW = 500 MiB/s, BW_utilization = 75%, max_ideal_request_size = 5 MiB,
+  // we expect the range_size_limit to be capped at 5 MiB.
+  check(CacheOptions::MakeFromNetworkMetrics(5, 500, .75, 5), 2.5, 5);
+}
+
 }  // namespace io
 }  // namespace arrow