You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/26 18:26:44 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #9310: ARROW-11367: [C++] Implement t-digest approximate quantile utility

pitrou commented on a change in pull request #9310:
URL: https://github.com/apache/arrow/pull/9310#discussion_r564696684



##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);

Review comment:
       `td.back().Merge(centroid)`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {

Review comment:
       Why is everything below exposed in this `.h` file?
   
   It seems that only the `TDigest` class needs to be exposed, and only `TDigest::Add(value)` needs to be inlined. All the rest can be hidden in a `.cc` file, reducing inclusions and achieving better modularity.
   

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);

Review comment:
       Hmm... if you want to minimize errors, perhaps you should have:
   ```c++
   const double q = q_prev + td[i].weight;
   const double k = this->K(q / total_weight);
   ```
   ?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight

Review comment:
       `std::cerr` would be better. 

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {

Review comment:
       Also, should this be `const`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {

Review comment:
       Return a `Status`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {

Review comment:
       This doesn't seem used anywhere. Is there a reason to keep it?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;

Review comment:
       I suppose this formula is meant to avoid accumulating errors?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";

Review comment:
       I don't think we want to log this, especially as Arrow is a library.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {

Review comment:
       Is this equality really supposed to hold exactly?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";

Review comment:
       We don't want to warn if it's ok to have an empty digest.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {

Review comment:
       Return a `Status` instead, so as to return the error message?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {

Review comment:
       Why do you need both `Merge(vector<TDigest*>&)` and `Merge(vector<std::unique_ptr<TDigest>>&)`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";

Review comment:
       It seems like either this function should return `Status<double>`, or parameters should be checked by the caller.

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks

Review comment:
       Note you could use indices instead of pointers.

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {

Review comment:
       `const auto&`

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";

Review comment:
       This doesn't seem really useful, is it?

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {

Review comment:
       `const std::vector<double>& quantiles`?

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {

Review comment:
       Instead of `std::vector<double>`, use either a `std::tuple<>` or a dedicated `struct`?

##########
File path: cpp/src/arrow/util/tdigest.h
##########
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// approximate quantiles from arbitrary length dataset with O(1) space
+// based on 'Computing Extremely Accurate Quantiles Using t-Digests' from Dunning & Ertl
+// - https://arxiv.org/abs/1902.04023
+// - https://github.com/tdunning/t-digest
+
+#pragma once
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <vector>
+
+#include "arrow/util/logging.h"
+
+#ifndef M_PI
+#define M_PI 3.14159265358979323846
+#endif
+
+namespace arrow {
+namespace internal {
+
+namespace detail {
+
+// a numerically stable lerp is unbelievably complex
+// but we are *approximating* the quantile, so let's keep it simple
+double Lerp(double a, double b, double t) { return a + t * (b - a); }
+
+// histogram bin
+struct Centroid {
+  double mean;
+  double weight;  // # data points in this bin
+
+  // merge with another centroid
+  void Merge(const Centroid& centroid) {
+    weight += centroid.weight;
+    mean += (centroid.mean - mean) * centroid.weight / weight;
+  }
+};
+
+// scale function K0: linear function, as baseline
+struct ScalerK0 {
+  explicit ScalerK0(uint32_t delta) : delta_norm(delta / 2.0) {}
+
+  double K(double q) const { return delta_norm * q; }
+  double Q(double k) const { return k / delta_norm; }
+
+  const double delta_norm;
+};
+
+// scale function K1
+struct ScalerK1 {
+  explicit ScalerK1(uint32_t delta) : delta_norm(delta / (2.0 * M_PI)) {}
+
+  double K(double q) const { return delta_norm * std::asin(2 * q - 1); }
+  double Q(double k) const { return (std::sin(k / delta_norm) + 1) / 2; }
+
+  const double delta_norm;
+};
+
+// implements t-digest merging algorithm
+template <class T = ScalerK1>
+class TDigestMerger : private T {
+ public:
+  explicit TDigestMerger(uint32_t delta) : T(delta) {}
+
+  void Reset(double total_weight, std::vector<Centroid>* tdigest) {
+    total_weight_ = total_weight;
+    tdigest_ = tdigest;
+    tdigest_->resize(0);
+    weight_so_far_ = 0;
+    weight_limit_ = -1;  // trigger first centroid merge
+  }
+
+  // merge one centroid from a sorted centroid stream
+  void Add(const Centroid& centroid) {
+    auto& td = *tdigest_;
+    const double weight = weight_so_far_ + centroid.weight;
+    if (weight <= weight_limit_) {
+      td[td.size() - 1].Merge(centroid);
+    } else {
+      const double quantile = weight_so_far_ / total_weight_;
+      const double next_weight_limit = total_weight_ * this->Q(this->K(quantile) + 1);
+      // weight limit should be strictly increasing, until the last centroid
+      if (next_weight_limit <= weight_limit_) {
+        weight_limit_ = total_weight_;
+      } else {
+        weight_limit_ = next_weight_limit;
+      }
+      td.push_back(centroid);  // should never exceed capacity and trigger reallocation
+    }
+    weight_so_far_ = weight;
+  }
+
+  // verify k-size of a tdigest
+  bool Verify(const std::vector<Centroid>* tdigest, double total_weight) const {
+    const auto& td = *tdigest;
+    double q_prev = 0, k_prev = this->K(0);
+    for (size_t i = 0; i < td.size(); ++i) {
+      const double q = q_prev + td[i].weight / total_weight;
+      const double k = this->K(q);
+      if (td[i].weight != 1 && (k - k_prev) > 1.001) {
+        ARROW_LOG(ERROR) << "oversized centroid, k2 - k1 = " << (k - k_prev);
+        return false;
+      }
+      k_prev = k;
+      q_prev = q;
+    }
+    return true;
+  }
+
+ private:
+  double total_weight_;   // total weight of this tdigest
+  double weight_so_far_;  // accumulated weight till current bin
+  double weight_limit_;   // max accumulated weight to move to next bin
+  std::vector<Centroid>* tdigest_;
+};
+
+}  // namespace detail
+
+class TDigest {
+ public:
+  explicit TDigest(uint32_t delta = kDelta, uint32_t buffer_size = kBufferSize)
+      : delta_(delta), merger_(delta) {
+    if (delta < 10) {
+      ARROW_LOG(WARNING) << "delta is too small, increased to 10";
+      delta = 10;
+    }
+    if (buffer_size < 50 || buffer_size < delta) {
+      ARROW_LOG(INFO) << "increase buffer size may improve performance";
+    }
+
+    // pre-allocate input and tdigest buffers, no dynamic memory allocation at runtime
+    input_.reserve(buffer_size);
+    tdigest_buffers_[0].reserve(delta);
+    tdigest_buffers_[1].reserve(delta);
+    tdigest_ = &tdigest_buffers_[0];
+    tdigest_next_ = &tdigest_buffers_[1];
+
+    Reset();
+  }
+
+  // no copy/move/assign due to some dirty pointer tricks
+  TDigest(const TDigest&) = delete;
+  TDigest(TDigest&&) = delete;
+  TDigest& operator=(const TDigest&) = delete;
+  TDigest& operator=(TDigest&&) = delete;
+
+  // reset and re-use this tdigest
+  void Reset() {
+    input_.resize(0);
+    tdigest_->resize(0);
+    tdigest_next_->resize(0);
+    total_weight_ = 0;
+    min_ = std::numeric_limits<double>::max();
+    max_ = std::numeric_limits<double>::lowest();
+  }
+
+  // verify data integrity, only for test
+  bool Verify() {
+    MergeInput();
+    // check weight, centroid order
+    double total_weight = 0, prev_mean = std::numeric_limits<double>::lowest();
+    for (const auto& centroid : *tdigest_) {
+      if (std::isnan(centroid.mean) || std::isnan(centroid.weight)) {
+        ARROW_LOG(ERROR) << "NAN found in tdigest";
+        return false;
+      }
+      if (centroid.mean < prev_mean) {
+        ARROW_LOG(ERROR) << "centroid mean decreases";
+        return false;
+      }
+      if (centroid.weight < 1) {
+        ARROW_LOG(ERROR) << "invalid centroid weight";
+        return false;
+      }
+      prev_mean = centroid.mean;
+      total_weight += centroid.weight;
+    }
+    if (total_weight != total_weight_) {
+      ARROW_LOG(ERROR) << "tdigest total weight mismatch";
+      return false;
+    }
+    // check if buffer expanded
+    if (tdigest_->capacity() > delta_) {
+      ARROW_LOG(ERROR) << "oversized tdigest buffer";
+      return false;
+    }
+    // check k-size
+    return merger_.Verify(tdigest_, total_weight_);
+  }
+
+  // dump internal data, only for debug
+  void Dump() {
+    MergeInput();
+    const auto& td = *tdigest_;
+    for (size_t i = 0; i < td.size(); ++i) {
+      std::cout << i << ": mean = " << td[i].mean << ", weight = " << td[i].weight
+                << std::endl;
+    }
+    std::cout << "min = " << min_ << ", max = " << max_ << std::endl;
+    const size_t total_memory =
+        2 * td.capacity() * sizeof(detail::Centroid) + input_.capacity() * sizeof(double);
+    std::cout << "total memory = " << total_memory << " bytes" << std::endl;
+  }
+
+  // buffer a single data point, consume internal buffer if full
+  // this function is intensively called and performance critical
+  void Add(double value) {
+    DCHECK(!std::isnan(value)) << "cannot add NAN";
+    if (ARROW_PREDICT_FALSE(input_.size() == input_.capacity())) {
+      MergeInput();
+    }
+    input_.push_back(value);
+  }
+
+  // merge with other t-digests, called infrequently
+  void Merge(std::vector<TDigest*>& tdigests) {
+    // current and end iterator
+    using CentroidIter = std::vector<detail::Centroid>::const_iterator;
+    using CentroidIterPair = std::pair<CentroidIter, CentroidIter>;
+    // use a min-heap to find next minimal centroid from all tdigests
+    auto centroid_gt = [](const CentroidIterPair& lhs, const CentroidIterPair& rhs) {
+      return lhs.first->mean > rhs.first->mean;
+    };
+    using CentroidQueue =
+        std::priority_queue<CentroidIterPair, std::vector<CentroidIterPair>,
+                            decltype(centroid_gt)>;
+
+    // trivial dynamic memory allocated at runtime
+    std::vector<CentroidIterPair> queue_buffer;
+    queue_buffer.reserve(tdigests.size() + 1);
+    CentroidQueue queue(std::move(centroid_gt), std::move(queue_buffer));
+
+    MergeInput();
+    if (tdigest_->size() > 0) {
+      queue.emplace(tdigest_->cbegin(), tdigest_->cend());
+    }
+    for (TDigest* td : tdigests) {
+      td->MergeInput();
+      if (td->tdigest_->size() > 0) {
+        queue.emplace(td->tdigest_->cbegin(), td->tdigest_->cend());
+        total_weight_ += td->total_weight_;
+        min_ = std::min(min_, td->min_);
+        max_ = std::max(max_, td->max_);
+      }
+    }
+
+    merger_.Reset(total_weight_, tdigest_next_);
+
+    CentroidIter current_iter, end_iter;
+    // do k-way merge till one buffer left
+    while (queue.size() > 1) {
+      std::tie(current_iter, end_iter) = queue.top();
+      merger_.Add(*current_iter);
+      queue.pop();
+      if (++current_iter != end_iter) {
+        queue.emplace(current_iter, end_iter);
+      }
+    }
+    // merge last buffer
+    if (!queue.empty()) {
+      std::tie(current_iter, end_iter) = queue.top();
+      while (current_iter != end_iter) {
+        merger_.Add(*current_iter++);
+      }
+    }
+
+    std::swap(tdigest_, tdigest_next_);
+  }
+
+  // an ugly helper
+  void Merge(std::vector<std::unique_ptr<TDigest>>& ptrs) {
+    std::vector<TDigest*> tdigests;
+    tdigests.reserve(ptrs.size());
+    for (auto& ptr : ptrs) {
+      tdigests.push_back(ptr.get());
+    }
+    Merge(tdigests);
+  }
+
+  // calculate quantile
+  double Quantile(double q) {
+    MergeInput();
+
+    const auto& td = *tdigest_;
+
+    if (q < 0 || q > 1) {
+      ARROW_LOG(ERROR) << "quantile must be between 0 and 1";
+      return NAN;
+    }
+    if (td.size() == 0) {
+      ARROW_LOG(WARNING) << "empty tdigest";
+      return NAN;
+    }
+
+    const double index = q * total_weight_;
+    if (index <= 1) {
+      return min_;
+    } else if (index >= total_weight_ - 1) {
+      return max_;
+    }
+
+    // find centroid contains the index
+    uint32_t ci = 0;
+    double weight_sum = 0;
+    for (; ci < td.size(); ++ci) {
+      weight_sum += td[ci].weight;
+      if (index <= weight_sum) {
+        break;
+      }
+    }
+    DCHECK_LT(ci, td.size());
+
+    // deviation of index from the centroid center
+    double diff = index + td[ci].weight / 2 - weight_sum;
+
+    // index happen to be in a unit weight centroid
+    if (td[ci].weight == 1 && std::abs(diff) < 0.5) {
+      return td[ci].mean;
+    }
+
+    // find adjacent centroids for interpolation
+    uint32_t ci_left = ci, ci_right = ci;
+    if (diff > 0) {
+      if (ci_right == td.size() - 1) {
+        // index larger than center of last bin
+        DCHECK_EQ(weight_sum, total_weight_);
+        const detail::Centroid* c = &td[ci_right];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(c->mean, max_, diff / (c->weight / 2));
+      }
+      ++ci_right;
+    } else {
+      if (ci_left == 0) {
+        // index smaller than center of first bin
+        const detail::Centroid* c = &td[0];
+        DCHECK_GE(c->weight, 2);
+        return detail::Lerp(min_, c->mean, index / (c->weight / 2));
+      }
+      --ci_left;
+      diff += td[ci_left].weight / 2 + td[ci_right].weight / 2;

Review comment:
       It seems this would be the same as doing `if (diff < 0) { diff += 1; }` below?
   (not sure which one is more readable or more faithful to the underlying math)

##########
File path: cpp/src/arrow/util/tdigest_test.cc
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// XXX: There's no rigid error bound available. The accuracy is to some degree
+// *random*, which depends on input data and quantiles to be calculated. I also
+// find small gaps among linux/windows/macos.
+// In below tests, most quantiles are within 1% deviation from exact values,
+// while the worst test case is about 10% drift.
+// To make test result stable, I relaxed error bound to be *good enough*.
+
+#include <algorithm>
+#include <cmath>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/testing/random.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/tdigest.h"
+
+namespace arrow {
+namespace internal {
+
+TEST(TDigestTest, SingleValue) {
+  const double value = 0.12345678;
+
+  TDigest td;
+  td.Add(value);
+  EXPECT_TRUE(td.Verify());
+  // all quantiles equal to same single vaue
+  for (double q = 0; q <= 1; q += 0.1) {
+    EXPECT_EQ(td.Quantile(q), value);
+  }
+}
+
+TEST(TDigestTest, FewValues) {
+  // exact quantile at 0.1 intervanl, test sorted and unsorted input
+  std::vector<std::vector<double>> values_vector = {
+      {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
+      {4, 1, 9, 0, 3, 2, 5, 6, 8, 7, 10},
+  };
+
+  for (auto& values : values_vector) {
+    TDigest td;
+    for (double v : values) {
+      td.Add(v);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    double q = 0;
+    for (size_t i = 0; i < values.size(); ++i) {
+      double expected = static_cast<double>(i);
+      EXPECT_EQ(td.Quantile(q), expected);
+      q += 0.1;
+    }
+  }
+}
+
+// Calculate exact quantile as truth
+std::vector<double> ExactQuantile(std::vector<double> values,
+                                  const std::vector<double> quantiles) {
+  std::sort(values.begin(), values.end());
+
+  std::vector<double> output;
+  for (double q : quantiles) {
+    const double index = (values.size() - 1) * q;
+    const int64_t lower_index = static_cast<int64_t>(index);
+    const double fraction = index - lower_index;
+    if (fraction == 0) {
+      output.push_back(values[lower_index]);
+    } else {
+      const double lerp =
+          fraction * values[lower_index + 1] + (1 - fraction) * values[lower_index];
+      output.push_back(lerp);
+    }
+  }
+  return output;
+}
+
+void TestRandom(size_t size) {
+  const std::vector<double> fixed_quantiles = {0, 0.01, 0.1, 0.2, 0.5, 0.8, 0.9, 0.99, 1};
+
+  // append random quantiles to test
+  std::vector<double> quantiles;
+  random_real(50, 0x11223344, 0.0, 1.0, &quantiles);
+  quantiles.insert(quantiles.end(), fixed_quantiles.cbegin(), fixed_quantiles.cend());
+
+  // generate random test values
+  const double min = 1e3, max = 1e10;
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  TDigest td(200);
+  for (double value : values) {
+    td.Add(value);
+  }
+  EXPECT_TRUE(td.Verify());
+
+  std::vector<double> expected = ExactQuantile(values, quantiles);
+  std::vector<double> approximated;
+  for (auto q : quantiles) {
+    approximated.push_back(td.Quantile(q));
+  }
+
+  // r-square of expected and approximated quantiles should be greater than 0.999
+  const double expected_mean =
+      std::accumulate(expected.begin(), expected.end(), 0.0) / expected.size();
+  double rss = 0, tss = 0;
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    rss += (expected[i] - approximated[i]) * (expected[i] - approximated[i]);
+    tss += (expected[i] - expected_mean) * (expected[i] - expected_mean);
+  }
+  const double r2 = 1 - rss / tss;
+  EXPECT_GT(r2, 0.999);
+
+  // make sure no quantile drifts more than 5% from truth
+  for (size_t i = 0; i < quantiles.size(); ++i) {
+    const double tolerance = std::fabs(expected[i]) * 0.05;
+    EXPECT_NEAR(approximated[i], expected[i], tolerance) << quantiles[i];
+  }
+}
+
+TEST(TDigestTest, RandomValues) { TestRandom(100000); }
+
+// too heavy to run in ci
+TEST(TDigestTest, DISABLED_HugeVolume) { TestRandom(1U << 30); }
+
+void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t delta,
+               double error_ratio) {
+  const std::vector<double> quantiles = {0,   0.01, 0.1, 0.2, 0.3,  0.4, 0.5,
+                                         0.6, 0.7,  0.8, 0.9, 0.99, 1};
+
+  std::vector<std::unique_ptr<TDigest>> tds;
+  for (const auto& values : values_vector) {
+    auto td = make_unique<TDigest>(delta);
+    for (double value : values) {
+      td->Add(value);
+    }
+    EXPECT_TRUE(td->Verify());
+    tds.push_back(std::move(td));
+  }
+
+  std::vector<double> values_combined;
+  for (const auto& values : values_vector) {
+    values_combined.insert(values_combined.end(), values.begin(), values.end());
+  }
+  std::vector<double> expected = ExactQuantile(values_combined, quantiles);
+
+  // merge into an empty tdigest
+  {
+    TDigest td(delta);
+    td.Merge(tds);
+    td.Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td.Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+
+  // merge into a non empty tdigest
+  {
+    std::unique_ptr<TDigest> td = std::move(tds[0]);
+    tds.erase(tds.begin(), tds.begin() + 1);
+    td->Merge(tds);
+    td->Verify();
+    for (size_t i = 0; i < quantiles.size(); ++i) {
+      const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
+      EXPECT_NEAR(td->Quantile(quantiles[i]), expected[i], tolerance) << quantiles[i];
+    }
+  }
+}
+
+// merge tdigests with same distribution
+TEST(TDigestTest, MergeUniform) {
+  const std::vector<size_t> sizes = {20000, 3000, 1500, 18000, 9999, 6666};
+  std::vector<std::vector<double>> values_vector;
+  for (auto size : sizes) {
+    std::vector<double> values;
+    random_real(size, 0x11223344, -123456789.0, 987654321.0, &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+// merge tdigests with different distributions
+TEST(TDigestTest, MergeNonUniform) {
+  const std::vector<std::vector<double>> configs = {
+      // {size, min, max}
+      {2000, 1e8, 1e9}, {0, 0, 0}, {3000, -1, 1}, {500, -1e6, -1e5}, {800, 100, 100},
+  };
+  std::vector<std::vector<double>> values_vector;
+  for (const auto& cfg : configs) {
+    std::vector<double> values;
+    random_real(static_cast<size_t>(cfg[0]), 0x11223344, cfg[1], cfg[2], &values);
+    values_vector.push_back(std::move(values));
+  }
+
+  TestMerge(values_vector, /*delta=*/200, /*error_ratio=*/0.05);
+}
+
+TEST(TDigestTest, Misc) {
+  const size_t size = 100000;
+  const double min = -1000, max = 1000;
+  const std::vector<double> quantiles = {0, 0.01, 0.1, 0.4, 0.7, 0.9, 0.99, 1};
+
+  std::vector<double> values;
+  random_real(size, 0x11223344, min, max, &values);
+
+  // test small delta and buffer
+  {
+    const double error_ratio = 0.15;  // low accuracy for small delta
+
+    TDigest td(10, 50);
+    for (double value : values) {
+      td.Add(value);
+    }
+    EXPECT_TRUE(td.Verify());
+
+    for (double q : quantiles) {
+      const double truth = ExactQuantile(values, {q})[0];

Review comment:
       If you call `ExactQuantile(values, quantiles)` once, this will spare multiple sortings of the same data.
   
   (this test takes ~400 ms here, which is a bit long)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org