You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2022/04/11 21:03:24 UTC

[datasketches-postgresql] 01/01: quantiles double sketch

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch quantiles
in repository https://gitbox.apache.org/repos/asf/datasketches-postgresql.git

commit 88436b05fd66c456a91ad3c273e1367d5e2ec133
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Apr 11 14:03:01 2022 -0700

    quantiles double sketch
---
 Makefile                                   |   6 +-
 src/quantiles_double_sketch_c_adapter.cpp  | 168 +++++++++++++
 src/quantiles_double_sketch_c_adapter.h    |  52 ++++
 src/quantiles_double_sketch_pg_functions.c | 384 +++++++++++++++++++++++++++++
 4 files changed, 608 insertions(+), 2 deletions(-)

diff --git a/Makefile b/Makefile
index a4b8555..17376de 100644
--- a/Makefile
+++ b/Makefile
@@ -38,7 +38,8 @@ OBJS = src/global_hooks.o src/base64.o src/common.o \
   src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o \
   src/hll_sketch_pg_functions.o src/hll_sketch_c_adapter.o \
   src/aod_sketch_pg_functions.o src/aod_sketch_c_adapter.o \
-  src/req_float_sketch_pg_functions.o src/req_float_sketch_c_adapter.o
+  src/req_float_sketch_pg_functions.o src/req_float_sketch_c_adapter.o \
+  src/quantiles_double_sketch_pg_functions.o src/quantiles_double_sketch_c_adapter.o
 
 # assume a dir or link named "datasketches-cpp" in the current dir
 CORE = datasketches-cpp
@@ -53,7 +54,8 @@ PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(BOOST) -I$(CORE)/common/include \
   -I$(CORE)/fi/include \
   -I$(CORE)/hll/include \
   -I$(CORE)/tuple/include \
-  -I$(CORE)/req/include
+  -I$(CORE)/req/include \
+  -I$(CORE)/quantiles/include
 PG_CXXFLAGS = -std=c++11
 SHLIB_LINK = -lstdc++ -L/usr/local/lib
 
diff --git a/src/quantiles_double_sketch_c_adapter.cpp b/src/quantiles_double_sketch_c_adapter.cpp
new file mode 100644
index 0000000..7e6a103
--- /dev/null
+++ b/src/quantiles_double_sketch_c_adapter.cpp
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "quantiles_double_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
+
+#include <quantiles_sketch.hpp>
+
+using quantiles_double_sketch = datasketches::quantiles_sketch<double, std::less<double>, palloc_allocator<double>>;
+
+void* quantiles_double_sketch_new(unsigned k) {
+  try {
+    return new (palloc(sizeof(quantiles_double_sketch))) quantiles_double_sketch(k);
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+void quantiles_double_sketch_delete(void* sketchptr) {
+  try {
+    static_cast<quantiles_double_sketch*>(sketchptr)->~quantiles_double_sketch();
+    pfree(sketchptr);
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+}
+
+void quantiles_double_sketch_update(void* sketchptr, double value) {
+  try {
+    static_cast<quantiles_double_sketch*>(sketchptr)->update(value);
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+}
+
+void quantiles_double_sketch_merge(void* sketchptr1, const void* sketchptr2) {
+  try {
+    static_cast<quantiles_double_sketch*>(sketchptr1)->merge(std::move(*static_cast<const quantiles_double_sketch*>(sketchptr2)));
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+}
+
+double quantiles_double_sketch_get_rank(const void* sketchptr, double value) {
+  try {
+    return static_cast<const quantiles_double_sketch*>(sketchptr)->get_rank(value);
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+double quantiles_double_sketch_get_quantile(const void* sketchptr, double rank) {
+  try {
+    return static_cast<const quantiles_double_sketch*>(sketchptr)->get_quantile(rank);
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+unsigned long long quantiles_double_sketch_get_n(const void* sketchptr) {
+  try {
+    return static_cast<const quantiles_double_sketch*>(sketchptr)->get_n();
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+char* quantiles_double_sketch_to_string(const void* sketchptr) {
+  try {
+    auto str = static_cast<const quantiles_double_sketch*>(sketchptr)->to_string();
+    const size_t len = str.length() + 1;
+    char* buffer = (char*) palloc(len);
+    strncpy(buffer, str.c_str(), len);
+    return buffer;
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+ptr_with_size quantiles_double_sketch_serialize(const void* sketchptr, unsigned header_size) {
+  try {
+    ptr_with_size p;
+    auto bytes = new (palloc(sizeof(quantiles_double_sketch::vector_bytes))) quantiles_double_sketch::vector_bytes(
+      static_cast<const quantiles_double_sketch*>(sketchptr)->serialize(header_size)
+    );
+    p.ptr = bytes->data();
+    p.size = bytes->size();
+    return p;
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+void* quantiles_double_sketch_deserialize(const char* buffer, unsigned length) {
+  try {
+    return new (palloc(sizeof(quantiles_double_sketch))) quantiles_double_sketch(quantiles_double_sketch::deserialize(buffer, length));
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+unsigned quantiles_double_sketch_get_serialized_size_bytes(const void* sketchptr) {
+  try {
+    return static_cast<const quantiles_double_sketch*>(sketchptr)->get_serialized_size_bytes();
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+Datum* quantiles_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale) {
+  try {
+    auto array = is_cdf ?
+      static_cast<const quantiles_double_sketch*>(sketchptr)->get_CDF(split_points, num_split_points) :
+      static_cast<const quantiles_double_sketch*>(sketchptr)->get_PMF(split_points, num_split_points);
+    Datum* pmf = (Datum*) palloc(sizeof(Datum) * (num_split_points + 1));
+    const uint64_t n = static_cast<const quantiles_double_sketch*>(sketchptr)->get_n();
+    for (unsigned i = 0; i < num_split_points + 1; i++) {
+      if (scale) {
+        pmf[i] = pg_float8_get_datum(array[i] * n);
+      } else {
+        pmf[i] = pg_float8_get_datum(array[i]);
+      }
+    }
+    return pmf;
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
+
+Datum* quantiles_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions) {
+  try {
+    auto array = static_cast<const quantiles_double_sketch*>(sketchptr)->get_quantiles(fractions, num_fractions);
+    Datum* quantiles = (Datum*) palloc(sizeof(Datum) * num_fractions);
+    for (unsigned i = 0; i < num_fractions; i++) {
+      quantiles[i] = pg_float8_get_datum(array[i]);
+    }
+    return quantiles;
+  } catch (std::exception& e) {
+    pg_error(e.what());
+  }
+  pg_unreachable();
+}
diff --git a/src/quantiles_double_sketch_c_adapter.h b/src/quantiles_double_sketch_c_adapter.h
new file mode 100644
index 0000000..ba045ff
--- /dev/null
+++ b/src/quantiles_double_sketch_c_adapter.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef QUANTILES_DOUBLE_SKETCH_C_ADAPTER_H
+#define QUANTILES_DOUBLE_SKETCH_C_ADAPTER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ptr_with_size.h"
+
+static const unsigned DEFAULT_K = 128;
+
+void* quantiles_double_sketch_new(unsigned k);
+void quantiles_double_sketch_delete(void* sketchptr);
+
+void quantiles_double_sketch_update(void* sketchptr, double value);
+void quantiles_double_sketch_merge(void* sketchptr1, const void* sketchptr2);
+double quantiles_double_sketch_get_rank(const void* sketchptr, double value);
+double quantiles_double_sketch_get_quantile(const void* sketchptr, double rank);
+unsigned long long quantiles_double_sketch_get_n(const void* sketchptr);
+char* quantiles_double_sketch_to_string(const void* sketchptr);
+
+struct ptr_with_size quantiles_double_sketch_serialize(const void* sketchptr, unsigned header_size);
+void* quantiles_double_sketch_deserialize(const char* buffer, unsigned length);
+unsigned quantiles_double_sketch_get_serialized_size_bytes(const void* sketchptr);
+
+void** quantiles_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale);
+void** quantiles_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/quantiles_double_sketch_pg_functions.c b/src/quantiles_double_sketch_pg_functions.c
new file mode 100644
index 0000000..6165423
--- /dev/null
+++ b/src/quantiles_double_sketch_pg_functions.c
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <postgres.h>
+#include <fmgr.h>
+#include <utils/lsyscache.h>
+#include <utils/builtins.h>
+#include <utils/array.h>
+#include <catalog/pg_type.h>
+
+#include "quantiles_double_sketch_c_adapter.h"
+#include "base64.h"
+
+/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_add_item);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_rank);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_quantile);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_n);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_to_string);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_merge);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_from_internal);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_pmf);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_cdf);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_quantiles);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_histogram);
+
+/* function declarations */
+Datum pg_quantiles_double_sketch_recv(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_send(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_add_item(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_rank(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_quantile(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_n(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_to_string(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_merge(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_from_internal(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_pmf(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_cdf(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_quantiles(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_histogram(PG_FUNCTION_ARGS);
+
+static const unsigned DEFAULT_NUM_BINS = 10;
+
+Datum pg_quantiles_double_sketch_add_item(PG_FUNCTION_ARGS) {
+  void* sketchptr;
+  double value;
+  int k;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+    PG_RETURN_NULL();
+  } else if (PG_ARGISNULL(1)) {
+    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+  }
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "quantiles_double_sketch_add_item called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  if (PG_ARGISNULL(0)) {
+    k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
+    sketchptr = quantiles_double_sketch_new(k);
+  } else {
+    sketchptr = PG_GETARG_POINTER(0);
+  }
+
+  value = PG_GETARG_FLOAT8(1);
+  quantiles_double_sketch_update(sketchptr, value);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_POINTER(sketchptr);
+}
+
+Datum pg_quantiles_double_sketch_get_rank(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  double value;
+  double rank;
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  value = PG_GETARG_FLOAT8(1);
+  rank = quantiles_double_sketch_get_rank(sketchptr, value);
+  quantiles_double_sketch_delete(sketchptr);
+  PG_RETURN_FLOAT8(rank);
+}
+
+Datum pg_quantiles_double_sketch_get_quantile(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  double value;
+  double rank;
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  rank = PG_GETARG_FLOAT8(1);
+  value = quantiles_double_sketch_get_quantile(sketchptr, rank);
+  quantiles_double_sketch_delete(sketchptr);
+  PG_RETURN_FLOAT8(value);
+}
+
+Datum pg_quantiles_double_sketch_get_n(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  uint64 n;
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  n = quantiles_double_sketch_get_n(sketchptr);
+  quantiles_double_sketch_delete(sketchptr);
+  PG_RETURN_INT64(n);
+}
+
+Datum pg_quantiles_double_sketch_to_string(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  char* str;
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  str = quantiles_double_sketch_to_string(sketchptr);
+  quantiles_double_sketch_delete(sketchptr);
+  PG_RETURN_TEXT_P(cstring_to_text(str));
+}
+
+Datum pg_quantiles_double_sketch_merge(PG_FUNCTION_ARGS) {
+  void* unionptr;
+  bytea* sketch_bytes;
+  void* sketchptr;
+  int k;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+    PG_RETURN_NULL();
+  } else if (PG_ARGISNULL(1)) {
+    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+  }
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "quantiles_double_sketch_merge called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  if (PG_ARGISNULL(0)) {
+    k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
+    unionptr = quantiles_double_sketch_new(k);
+  } else {
+    unionptr = PG_GETARG_POINTER(0);
+  }
+
+  sketch_bytes = PG_GETARG_BYTEA_P(1);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+  quantiles_double_sketch_merge(unionptr, sketchptr);
+  quantiles_double_sketch_delete(sketchptr);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_POINTER(unionptr);
+}
+
+Datum pg_quantiles_double_sketch_from_internal(PG_FUNCTION_ARGS) {
+  void* sketchptr;
+  struct ptr_with_size bytes_out;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "quantiles_double_sketch_from_internal called in non-aggregate context");
+  }
+  sketchptr = PG_GETARG_POINTER(0);
+  bytes_out = quantiles_double_sketch_serialize(sketchptr, VARHDRSZ);
+  quantiles_double_sketch_delete(sketchptr);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_quantiles_double_sketch_get_pmf(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+
+  // input array of split points
+  ArrayType* arr_in;
+  Oid elmtype_in;
+  int16 elmlen_in;
+  bool elmbyval_in;
+  char elmalign_in;
+  Datum* data_in;
+  bool* nulls_in;
+  int arr_len_in;
+  double* split_points;
+
+  // output array of fractions
+  Datum* result;
+  ArrayType* arr_out;
+  int16 elmlen_out;
+  bool elmbyval_out;
+  char elmalign_out;
+  int arr_len_out;
+
+  int i;
+
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+  arr_in = PG_GETARG_ARRAYTYPE_P(1);
+  elmtype_in = ARR_ELEMTYPE(arr_in);
+  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);
+
+  split_points = palloc(sizeof(double) * arr_len_in);
+  for (i = 0; i < arr_len_in; i++) {
+    split_points[i] = DatumGetFloat4(data_in[i]);
+  }
+  result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, false, false);
+  pfree(split_points);
+
+  // construct output array of fractions
+  arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
+  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+  quantiles_double_sketch_delete(sketchptr);
+
+  PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_cdf(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+
+  // input array of split points
+  ArrayType* arr_in;
+  Oid elmtype_in;
+  int16 elmlen_in;
+  bool elmbyval_in;
+  char elmalign_in;
+  Datum* data_in;
+  bool* nulls_in;
+  int arr_len_in;
+  double* split_points;
+
+  // output array of fractions
+  Datum* result;
+  ArrayType* arr_out;
+  int16 elmlen_out;
+  bool elmbyval_out;
+  char elmalign_out;
+  int arr_len_out;
+
+  int i;
+
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+  arr_in = PG_GETARG_ARRAYTYPE_P(1);
+  elmtype_in = ARR_ELEMTYPE(arr_in);
+  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);
+
+  split_points = palloc(sizeof(double) * arr_len_in);
+  for (i = 0; i < arr_len_in; i++) {
+    split_points[i] = DatumGetFloat4(data_in[i]);
+  }
+  result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, true, false);
+  pfree(split_points);
+
+  // construct output array of fractions
+  arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
+  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+  quantiles_double_sketch_delete(sketchptr);
+
+  PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_quantiles(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+
+  // input array of fractions
+  ArrayType* arr_in;
+  Oid elmtype_in;
+  int16 elmlen_in;
+  bool elmbyval_in;
+  char elmalign_in;
+  Datum* data_in;
+  bool* nulls_in;
+  int arr_len;
+  double* fractions;
+
+  // output array of quantiles
+  Datum* quantiles;
+  ArrayType* arr_out;
+  int16 elmlen_out;
+  bool elmbyval_out;
+  char elmalign_out;
+
+  int i;
+
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+  arr_in = PG_GETARG_ARRAYTYPE_P(1);
+  elmtype_in = ARR_ELEMTYPE(arr_in);
+  get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+  deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len);
+
+  fractions = palloc(sizeof(double) * arr_len);
+  for (i = 0; i < arr_len; i++) {
+    fractions[i] = DatumGetFloat8(data_in[i]);
+  }
+  quantiles = (Datum*) quantiles_double_sketch_get_quantiles(sketchptr, fractions, arr_len);
+  pfree(fractions);
+
+  // construct output array of quantiles
+  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+  arr_out = construct_array(quantiles, arr_len, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+  quantiles_double_sketch_delete(sketchptr);
+
+  PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_histogram(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  int num_bins;
+
+  // output array of bins
+  Datum* result;
+  ArrayType* arr_out;
+  int16 elmlen_out;
+  bool elmbyval_out;
+  char elmalign_out;
+  int arr_len_out;
+
+  int i;
+
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+  num_bins = PG_NARGS() > 1 ? PG_GETARG_INT32(1) : DEFAULT_NUM_BINS;
+  if (num_bins < 2) {
+    elog(ERROR, "at least two bins expected");
+  }
+
+  double* split_points = palloc(sizeof(double) * (num_bins - 1));
+  const double min_value = quantiles_double_sketch_get_quantile(sketchptr, 0);
+  const double max_value = quantiles_double_sketch_get_quantile(sketchptr, 1);
+  const double delta = (max_value - min_value) / num_bins;
+  for (i = 0; i < num_bins - 1; i++) {
+    split_points[i] = min_value + delta * (i + 1);
+  }
+  result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, num_bins - 1, false, true);
+  pfree(split_points);
+
+  // construct output array
+  arr_len_out = num_bins;
+  get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+  arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+  quantiles_double_sketch_delete(sketchptr);
+
+  PG_RETURN_ARRAYTYPE_P(arr_out);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org