You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/25 02:36:00 UTC

[GitHub] [arrow] cyb70289 opened a new pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

cyb70289 opened a new pull request #9310:
URL: https://github.com/apache/arrow/pull/9310


   t-Digest is a data structure to approximate accurate quantiles of
   arbitrary length dataset using constant space.
   This utility will be used in implementing approximate quantile kernel
   and latency estimation in flightrpc benchmark.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-766570951


   mingw64 ci failure is not relevant


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565181234



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);

Review comment:
       Good suggestion.
   This way I will rename `q` to something like `weight_so_far`.
   As this function only does validation, looks not very important. And roundoff error is this case should be small enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565115252



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {

Review comment:
       Leave only `unique_ptr` version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565118795



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-766570951


   mingw64 ci failure is not relevant


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565119802



##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565131650



##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {
+      // {size, min, max}
+      {2000, 1e8, 1e9}, {0, 0, 0}, {3000, -1, 1}, {500, -1e6, -1e5}, {800, 100, 100},
+  };
+  std::vector<std::vector<double>> values_vector;
+  for (const auto& cfg : configs) {
+    std::vector<double> values;
+    random_real(static_cast<size_t>(cfg[0]), 0x11223344, cfg[1], cfg[2], &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+TEST(TDigestTest, Misc) {
+  const size_t size = 100000;
+  const double min = -1000, max = 1000;
+  const std::vector<double> quantiles = {0, 0.01, 0.1, 0.4, 0.7, 0.9, 0.99, 1};
+
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  // test small delta and buffer
+  {
+    const double error_ratio = 0.15;  // low accuracy for small delta
+
+    TDigest td(10, 50);
+    for (double value : values) {
+      td.Add(value);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    for (double q : quantiles) {
+      const double truth = ExactQuantile(values, {q})[0];

Review comment:
       Good point.
   On my desktop, test time drops from 108ms to 22ms after the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r564978390



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {

Review comment:
       It's the baseline to evaluate accuracy and performance of scale functions.
   Have to modify source code manually to enable it.
   I would like to keep it as new scale functions may be introduced in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565112495



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks

Review comment:
       Changed to indices. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-766501335


   https://issues.apache.org/jira/browse/ARROW-11367


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565112232



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565114525



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {

Review comment:
       Yes, it should always be true.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r564981864



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;

Review comment:
       No. I didn't consider roundoff errors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-768248411


   @cyb70289 Feel free to ping when this is ready to review again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-768926833


   I rebased and force-pushed in order to trigger CI.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565111633



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #9310:
URL: https://github.com/apache/arrow/pull/9310


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565166775



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";
+      return NAN;
+    }
+
+    const double index = q * total_weight_;
+    if (index <= 1) {
+      return min_;
+    } else if (index >= total_weight_ - 1) {
+      return max_;
+    }
+
+    // find centroid contains the index
+    uint32_t ci = 0;
+    double weight_sum = 0;
+    for (; ci < td.size(); ++ci) {
+      weight_sum += td[ci].weight;
+      if (index <= weight_sum) {
+        break;
+      }
+    }
+    DCHECK_LT(ci, td.size());
+
+    // deviation of index from the centroid center
+    double diff = index + td[ci].weight / 2 - weight_sum;
+
+    // index happen to be in a unit weight centroid
+    if (td[ci].weight == 1 && std::abs(diff) < 0.5) {
+      return td[ci].mean;
+    }
+
+    // find adjacent centroids for interpolation
+    uint32_t ci_left = ci, ci_right = ci;
+    if (diff > 0) {
+      if (ci_right == td.size() - 1) {
+        // index larger than center of last bin
+        DCHECK_EQ(weight_sum, total_weight_);
+        const detail::Centroid* c = &td[ci_right];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(c->mean, max_, diff / (c->weight / 2));
+      }
+      ++ci_right;
+    } else {
+      if (ci_left == 0) {
+        // index smaller than center of first bin
+        const detail::Centroid* c = &td[0];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(min_, c->mean, index / (c->weight / 2));
+      }
+      --ci_left;
+      diff += td[ci_left].weight / 2 + td[ci_right].weight / 2;

Review comment:
       Good catch!
   I think they are both faithful to math. `diff` is finally the normalized distance from quantile point to left centroid (taken distance from left to right centroids as 1). Current code calculates it by the definition. Your suggestion calculates it as `1 - distance-to-right-centroid`.
   Suggested code is more concise, drop that somewhat duplicated line, add one simple line `diff += (diff <= 0)`).
   Current code is more straightforward for me. I would prefer keeping current code. And I guess compiler can do `common subexpression elimination` to avoid duplicated calculation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565118552



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";

Review comment:
       Return Status<double> looks kind of tedious. I removed the error messages, simply return NAN on invalid inputs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-769028946


   CI failure is not relevant


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-770551351


   https://github.com/apache/arrow/pull/9384


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r564696684



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);

Review comment:
       `td.back().Merge(centroid)`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {

Review comment:
       Why is everything below exposed in this `.h` file?
   
   It seems that only the `TDigest` class needs to be exposed, and only `TDigest::Add(value)` needs to be inlined. All the rest can be hidden in a `.cc` file, reducing inclusions and achieving better modularity.
   

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);

Review comment:
       Hmm... if you want to minimize errors, perhaps you should have:
   ```c++
   const double q = q_prev + td[i].weight;
   const double k = this->K(q / total_weight);
   ```
   ?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight

Review comment:
       `std::cerr` would be better. 

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {

Review comment:
       Also, should this be `const`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {

Review comment:
       Return a `Status`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {

Review comment:
       This doesn't seem used anywhere. Is there a reason to keep it?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;

Review comment:
       I suppose this formula is meant to avoid accumulating errors?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";

Review comment:
       I don't think we want to log this, especially as Arrow is a library.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {

Review comment:
       Is this equality really supposed to hold exactly?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";

Review comment:
       We don't want to warn if it's ok to have an empty digest.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {

Review comment:
       Return a `Status` instead, so as to return the error message?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {

Review comment:
       Why do you need both `Merge(vector<TDigest*>&)` and `Merge(vector<std::unique_ptr<TDigest>>&)`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";

Review comment:
       It seems like either this function should return `Status<double>`, or parameters should be checked by the caller.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks

Review comment:
       Note you could use indices instead of pointers.

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {

Review comment:
       `const auto&`

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";

Review comment:
       This doesn't seem really useful, is it?

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {

Review comment:
       `const std::vector<double>& quantiles`?

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {

Review comment:
       Instead of `std::vector<double>`, use either a `std::tuple<>` or a dedicated `struct`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";
+      return NAN;
+    }
+
+    const double index = q * total_weight_;
+    if (index <= 1) {
+      return min_;
+    } else if (index >= total_weight_ - 1) {
+      return max_;
+    }
+
+    // find centroid contains the index
+    uint32_t ci = 0;
+    double weight_sum = 0;
+    for (; ci < td.size(); ++ci) {
+      weight_sum += td[ci].weight;
+      if (index <= weight_sum) {
+        break;
+      }
+    }
+    DCHECK_LT(ci, td.size());
+
+    // deviation of index from the centroid center
+    double diff = index + td[ci].weight / 2 - weight_sum;
+
+    // index happen to be in a unit weight centroid
+    if (td[ci].weight == 1 && std::abs(diff) < 0.5) {
+      return td[ci].mean;
+    }
+
+    // find adjacent centroids for interpolation
+    uint32_t ci_left = ci, ci_right = ci;
+    if (diff > 0) {
+      if (ci_right == td.size() - 1) {
+        // index larger than center of last bin
+        DCHECK_EQ(weight_sum, total_weight_);
+        const detail::Centroid* c = &td[ci_right];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(c->mean, max_, diff / (c->weight / 2));
+      }
+      ++ci_right;
+    } else {
+      if (ci_left == 0) {
+        // index smaller than center of first bin
+        const detail::Centroid* c = &td[0];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(min_, c->mean, index / (c->weight / 2));
+      }
+      --ci_left;
+      diff += td[ci_left].weight / 2 + td[ci_right].weight / 2;

Review comment:
       It seems this would be the same as doing `if (diff < 0) { diff += 1; }` below?
   (not sure which one is more readable or more faithful to the underlying math)

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {
+      // {size, min, max}
+      {2000, 1e8, 1e9}, {0, 0, 0}, {3000, -1, 1}, {500, -1e6, -1e5}, {800, 100, 100},
+  };
+  std::vector<std::vector<double>> values_vector;
+  for (const auto& cfg : configs) {
+    std::vector<double> values;
+    random_real(static_cast<size_t>(cfg[0]), 0x11223344, cfg[1], cfg[2], &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+TEST(TDigestTest, Misc) {
+  const size_t size = 100000;
+  const double min = -1000, max = 1000;
+  const std::vector<double> quantiles = {0, 0.01, 0.1, 0.4, 0.7, 0.9, 0.99, 1};
+
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  // test small delta and buffer
+  {
+    const double error_ratio = 0.15;  // low accuracy for small delta
+
+    TDigest td(10, 50);
+    for (double value : values) {
+      td.Add(value);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    for (double q : quantiles) {
+      const double truth = ExactQuantile(values, {q})[0];

Review comment:
       If you call `ExactQuantile(values, quantiles)` once, this will spare multiple sortings of the same data.
   
   (this test takes ~400 ms here, which is a bit long)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565137291



##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {

Review comment:
       std::tuple used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-766501335


   https://issues.apache.org/jira/browse/ARROW-11367


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565112063



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] tdunning commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
tdunning commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r566573254



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;

Review comment:
       This code is copied from the original Java and is an implementation of Welford's method for avoiding roundoff in computing variance. The method includes a computation of the mean.
   
   See https://jonisalonen.com/2013/deriving-welfords-method-for-computing-variance/  https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-769088194


   CI failures are unrelated, will merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-769045135


   I've started a Travis-CI build on my fork: https://travis-ci.com/github/pitrou/arrow/builds/215068826


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 removed a comment on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 removed a comment on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-766570951


   mingw64 ci failure is not relevant


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565114893



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565111868



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565111310



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-768814898


   @pitrou , code should be okay to review now. Looks CI system has some problem, only two jobs are started.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-770508361


   will look into this issue


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kou commented on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
kou commented on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-770458676


   It seems that `tdigest_test.cc` can't built with g++ 4.8.2. gandiva-jar-ubuntu nightly build is failing:
   
   https://github.com/ursacomputing/crossbow/runs/1800166679#step:3:948
   
   ```text
   FAILED: src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o 
   /usr/bin/ccache /opt/rh/devtoolset-2/root/usr/bin/c++ -DARROW_HAVE_RUNTIME_SSE4_2 -DARROW_HAVE_SSE4_2 -DARROW_JEMALLOC -DARROW_JEMALLOC_INCLUDE_DIR="" -DARROW_WITH_RE2 -DARROW_WITH_TIMING_TESTS -DURI_STATIC_BUILD -Isrc -I/arrow/cpp/src -I/arrow/cpp/src/generated -isystem /arrow/cpp/thirdparty/flatbuffers/include -isystem /arrow_boost_dist/include -isystem gflags_ep-prefix/src/gflags_ep/include -isystem jemalloc_ep-prefix/src -isystem rapidjson_ep/src/rapidjson_ep-install/include -isystem re2_ep-install/include -isystem /arrow/cpp/thirdparty/hadoop/include -O3 -DNDEBUG  -Wall -Wno-attributes -msse4.2  -O3 -DNDEBUG -fPIE -pthread -std=c++11 -MD -MT src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o -MF src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o.d -o src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o -c /arrow/cpp/src/arrow/util/tdigest_test.cc
   /arrow/cpp/src/arrow/util/tdigest_test.cc: In member function ‘virtual void arrow::internal::TDigestTest_MergeNonUniform_Test::TestBody()’:
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, double, double}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
      };
      ^
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, double, double}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kou edited a comment on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
kou edited a comment on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-770458676


   It seems that `tdigest_test.cc` can't be built with g++ 4.8.2. gandiva-jar-ubuntu nightly build is failing:
   
   https://github.com/ursacomputing/crossbow/runs/1800166679#step:3:948
   
   ```text
   FAILED: src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o 
   /usr/bin/ccache /opt/rh/devtoolset-2/root/usr/bin/c++ -DARROW_HAVE_RUNTIME_SSE4_2 -DARROW_HAVE_SSE4_2 -DARROW_JEMALLOC -DARROW_JEMALLOC_INCLUDE_DIR="" -DARROW_WITH_RE2 -DARROW_WITH_TIMING_TESTS -DURI_STATIC_BUILD -Isrc -I/arrow/cpp/src -I/arrow/cpp/src/generated -isystem /arrow/cpp/thirdparty/flatbuffers/include -isystem /arrow_boost_dist/include -isystem gflags_ep-prefix/src/gflags_ep/include -isystem jemalloc_ep-prefix/src -isystem rapidjson_ep/src/rapidjson_ep-install/include -isystem re2_ep-install/include -isystem /arrow/cpp/thirdparty/hadoop/include -O3 -DNDEBUG  -Wall -Wno-attributes -msse4.2  -O3 -DNDEBUG -fPIE -pthread -std=c++11 -MD -MT src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o -MF src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o.d -o src/arrow/util/CMakeFiles/arrow-utility-test.dir/tdigest_test.cc.o -c /arrow/cpp/src/arrow/util/tdigest_test.cc
   /arrow/cpp/src/arrow/util/tdigest_test.cc: In member function ‘virtual void arrow::internal::TDigestTest_MergeNonUniform_Test::TestBody()’:
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, double, double}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
      };
      ^
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, double, double}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   /arrow/cpp/src/arrow/util/tdigest_test.cc:217:3: error: converting to ‘std::tuple<long unsigned int, double, double>’ from initializer list would use explicit constructor ‘constexpr std::tuple< <template-parameter-1-1> >::tuple(_UElements&& ...) [with _UElements = {int, int, int}; <template-parameter-2-2> = void; _Elements = {long unsigned int, double, double}]’
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 edited a comment on pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 edited a comment on pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#issuecomment-770551351


   https://github.com/apache/arrow/pull/9384 fixes this bug


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565120398



##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r564736751



##########
File path: cpp/src/arrow/util/tdigest_benchmark.cc
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace util {
+
+static constexpr uint32_t kDelta = 100;
+static constexpr uint32_t kBufferSize = 500;
+
+static void BenchmarkTDigest(benchmark::State& state) {
+  const size_t items = state.range(0);
+  std::vector<double> values;
+  random_real(items, 0x11223344, -12345678.0, 12345678.0, &values);
+
+  for (auto _ : state) {
+    arrow::internal::TDigest td(kDelta, kBufferSize);
+    for (double value : values) {
+      td.Add(value);
+    }
+    benchmark::DoNotOptimize(td.Quantile(0));
+  }
+  state.SetItemsProcessed(state.iterations() * items);
+}
+
+BENCHMARK(BenchmarkTDigest)->Arg(1 << 16)->Arg(1 << 20)->Arg(1 << 24);

Review comment:
       We should probably use smaller sizes in order to get enough iterations (the larger size here only gets 1 iteration here).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565140349



##########
File path: cpp/src/arrow/util/tdigest_benchmark.cc
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace util {
+
+static constexpr uint32_t kDelta = 100;
+static constexpr uint32_t kBufferSize = 500;
+
+static void BenchmarkTDigest(benchmark::State& state) {
+  const size_t items = state.range(0);
+  std::vector<double> values;
+  random_real(items, 0x11223344, -12345678.0, 12345678.0, &values);
+
+  for (auto _ : state) {
+    arrow::internal::TDigest td(kDelta, kBufferSize);
+    for (double value : values) {
+      td.Add(value);
+    }
+    benchmark::DoNotOptimize(td.Quantile(0));
+  }
+  state.SetItemsProcessed(state.iterations() * items);
+}
+
+BENCHMARK(BenchmarkTDigest)->Arg(1 << 16)->Arg(1 << 20)->Arg(1 << 24);

Review comment:
       Removed `1 << 24`, added `1 << 12`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r565114100



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {

Review comment:
       Done with returning Status.
   It will merge input buffer if it's not empty before do verification. So it's not declared as `const`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org