You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2022/04/11 21:03:24 UTC
[datasketches-postgresql] 01/01: quantiles double sketch
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch quantiles
in repository https://gitbox.apache.org/repos/asf/datasketches-postgresql.git
commit 88436b05fd66c456a91ad3c273e1367d5e2ec133
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Apr 11 14:03:01 2022 -0700
quantiles double sketch
---
Makefile | 6 +-
src/quantiles_double_sketch_c_adapter.cpp | 168 +++++++++++++
src/quantiles_double_sketch_c_adapter.h | 52 ++++
src/quantiles_double_sketch_pg_functions.c | 384 +++++++++++++++++++++++++++++
4 files changed, 608 insertions(+), 2 deletions(-)
diff --git a/Makefile b/Makefile
index a4b8555..17376de 100644
--- a/Makefile
+++ b/Makefile
@@ -38,7 +38,8 @@ OBJS = src/global_hooks.o src/base64.o src/common.o \
src/frequent_strings_sketch_pg_functions.o src/frequent_strings_sketch_c_adapter.o \
src/hll_sketch_pg_functions.o src/hll_sketch_c_adapter.o \
src/aod_sketch_pg_functions.o src/aod_sketch_c_adapter.o \
- src/req_float_sketch_pg_functions.o src/req_float_sketch_c_adapter.o
+ src/req_float_sketch_pg_functions.o src/req_float_sketch_c_adapter.o \
+ src/quantiles_double_sketch_pg_functions.o src/quantiles_double_sketch_c_adapter.o
# assume a dir or link named "datasketches-cpp" in the current dir
CORE = datasketches-cpp
@@ -53,7 +54,8 @@ PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(BOOST) -I$(CORE)/common/include \
-I$(CORE)/fi/include \
-I$(CORE)/hll/include \
-I$(CORE)/tuple/include \
- -I$(CORE)/req/include
+ -I$(CORE)/req/include \
+ -I$(CORE)/quantiles/include
PG_CXXFLAGS = -std=c++11
SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/src/quantiles_double_sketch_c_adapter.cpp b/src/quantiles_double_sketch_c_adapter.cpp
new file mode 100644
index 0000000..7e6a103
--- /dev/null
+++ b/src/quantiles_double_sketch_c_adapter.cpp
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "quantiles_double_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
+
+#include <quantiles_sketch.hpp>
+
+using quantiles_double_sketch = datasketches::quantiles_sketch<double, std::less<double>, palloc_allocator<double>>;
+
+void* quantiles_double_sketch_new(unsigned k) {
+ try {
+ return new (palloc(sizeof(quantiles_double_sketch))) quantiles_double_sketch(k);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void quantiles_double_sketch_delete(void* sketchptr) {
+ try {
+ static_cast<quantiles_double_sketch*>(sketchptr)->~quantiles_double_sketch();
+ pfree(sketchptr);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void quantiles_double_sketch_update(void* sketchptr, double value) {
+ try {
+ static_cast<quantiles_double_sketch*>(sketchptr)->update(value);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+void quantiles_double_sketch_merge(void* sketchptr1, const void* sketchptr2) {
+ try {
+ static_cast<quantiles_double_sketch*>(sketchptr1)->merge(std::move(*static_cast<const quantiles_double_sketch*>(sketchptr2)));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+}
+
+double quantiles_double_sketch_get_rank(const void* sketchptr, double value) {
+ try {
+ return static_cast<const quantiles_double_sketch*>(sketchptr)->get_rank(value);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+double quantiles_double_sketch_get_quantile(const void* sketchptr, double rank) {
+ try {
+ return static_cast<const quantiles_double_sketch*>(sketchptr)->get_quantile(rank);
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+unsigned long long quantiles_double_sketch_get_n(const void* sketchptr) {
+ try {
+ return static_cast<const quantiles_double_sketch*>(sketchptr)->get_n();
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+char* quantiles_double_sketch_to_string(const void* sketchptr) {
+ try {
+ auto str = static_cast<const quantiles_double_sketch*>(sketchptr)->to_string();
+ const size_t len = str.length() + 1;
+ char* buffer = (char*) palloc(len);
+ strncpy(buffer, str.c_str(), len);
+ return buffer;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+ptr_with_size quantiles_double_sketch_serialize(const void* sketchptr, unsigned header_size) {
+ try {
+ ptr_with_size p;
+ auto bytes = new (palloc(sizeof(quantiles_double_sketch::vector_bytes))) quantiles_double_sketch::vector_bytes(
+ static_cast<const quantiles_double_sketch*>(sketchptr)->serialize(header_size)
+ );
+ p.ptr = bytes->data();
+ p.size = bytes->size();
+ return p;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+void* quantiles_double_sketch_deserialize(const char* buffer, unsigned length) {
+ try {
+ return new (palloc(sizeof(quantiles_double_sketch))) quantiles_double_sketch(quantiles_double_sketch::deserialize(buffer, length));
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+unsigned quantiles_double_sketch_get_serialized_size_bytes(const void* sketchptr) {
+ try {
+ return static_cast<const quantiles_double_sketch*>(sketchptr)->get_serialized_size_bytes();
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+Datum* quantiles_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale) {
+ try {
+ auto array = is_cdf ?
+ static_cast<const quantiles_double_sketch*>(sketchptr)->get_CDF(split_points, num_split_points) :
+ static_cast<const quantiles_double_sketch*>(sketchptr)->get_PMF(split_points, num_split_points);
+ Datum* pmf = (Datum*) palloc(sizeof(Datum) * (num_split_points + 1));
+ const uint64_t n = static_cast<const quantiles_double_sketch*>(sketchptr)->get_n();
+ for (unsigned i = 0; i < num_split_points + 1; i++) {
+ if (scale) {
+ pmf[i] = pg_float8_get_datum(array[i] * n);
+ } else {
+ pmf[i] = pg_float8_get_datum(array[i]);
+ }
+ }
+ return pmf;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
+
+Datum* quantiles_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions) {
+ try {
+ auto array = static_cast<const quantiles_double_sketch*>(sketchptr)->get_quantiles(fractions, num_fractions);
+ Datum* quantiles = (Datum*) palloc(sizeof(Datum) * num_fractions);
+ for (unsigned i = 0; i < num_fractions; i++) {
+ quantiles[i] = pg_float8_get_datum(array[i]);
+ }
+ return quantiles;
+ } catch (std::exception& e) {
+ pg_error(e.what());
+ }
+ pg_unreachable();
+}
diff --git a/src/quantiles_double_sketch_c_adapter.h b/src/quantiles_double_sketch_c_adapter.h
new file mode 100644
index 0000000..ba045ff
--- /dev/null
+++ b/src/quantiles_double_sketch_c_adapter.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef QUANTILES_DOUBLE_SKETCH_C_ADAPTER_H
+#define QUANTILES_DOUBLE_SKETCH_C_ADAPTER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "ptr_with_size.h"
+
+static const unsigned DEFAULT_K = 128;
+
+void* quantiles_double_sketch_new(unsigned k);
+void quantiles_double_sketch_delete(void* sketchptr);
+
+void quantiles_double_sketch_update(void* sketchptr, double value);
+void quantiles_double_sketch_merge(void* sketchptr1, const void* sketchptr2);
+double quantiles_double_sketch_get_rank(const void* sketchptr, double value);
+double quantiles_double_sketch_get_quantile(const void* sketchptr, double rank);
+unsigned long long quantiles_double_sketch_get_n(const void* sketchptr);
+char* quantiles_double_sketch_to_string(const void* sketchptr);
+
+struct ptr_with_size quantiles_double_sketch_serialize(const void* sketchptr, unsigned header_size);
+void* quantiles_double_sketch_deserialize(const char* buffer, unsigned length);
+unsigned quantiles_double_sketch_get_serialized_size_bytes(const void* sketchptr);
+
+void** quantiles_double_sketch_get_pmf_or_cdf(const void* sketchptr, const double* split_points, unsigned num_split_points, bool is_cdf, bool scale);
+void** quantiles_double_sketch_get_quantiles(const void* sketchptr, const double* fractions, unsigned num_fractions);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/quantiles_double_sketch_pg_functions.c b/src/quantiles_double_sketch_pg_functions.c
new file mode 100644
index 0000000..6165423
--- /dev/null
+++ b/src/quantiles_double_sketch_pg_functions.c
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <postgres.h>
+#include <fmgr.h>
+#include <utils/lsyscache.h>
+#include <utils/builtins.h>
+#include <utils/array.h>
+#include <catalog/pg_type.h>
+
+#include "quantiles_double_sketch_c_adapter.h"
+#include "base64.h"
+
+/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_add_item);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_rank);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_quantile);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_n);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_to_string);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_merge);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_from_internal);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_pmf);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_cdf);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_quantiles);
+PG_FUNCTION_INFO_V1(pg_quantiles_double_sketch_get_histogram);
+
+/* function declarations */
+Datum pg_quantiles_double_sketch_recv(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_send(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_add_item(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_rank(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_quantile(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_n(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_to_string(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_merge(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_from_internal(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_pmf(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_cdf(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_quantiles(PG_FUNCTION_ARGS);
+Datum pg_quantiles_double_sketch_get_histogram(PG_FUNCTION_ARGS);
+
+static const unsigned DEFAULT_NUM_BINS = 10;
+
+Datum pg_quantiles_double_sketch_add_item(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ double value;
+ int k;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "quantiles_double_sketch_add_item called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
+ sketchptr = quantiles_double_sketch_new(k);
+ } else {
+ sketchptr = PG_GETARG_POINTER(0);
+ }
+
+ value = PG_GETARG_FLOAT8(1);
+ quantiles_double_sketch_update(sketchptr, value);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(sketchptr);
+}
+
+Datum pg_quantiles_double_sketch_get_rank(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ double value;
+ double rank;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ value = PG_GETARG_FLOAT8(1);
+ rank = quantiles_double_sketch_get_rank(sketchptr, value);
+ quantiles_double_sketch_delete(sketchptr);
+ PG_RETURN_FLOAT8(rank);
+}
+
+Datum pg_quantiles_double_sketch_get_quantile(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ double value;
+ double rank;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ rank = PG_GETARG_FLOAT8(1);
+ value = quantiles_double_sketch_get_quantile(sketchptr, rank);
+ quantiles_double_sketch_delete(sketchptr);
+ PG_RETURN_FLOAT8(value);
+}
+
+Datum pg_quantiles_double_sketch_get_n(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ uint64 n;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ n = quantiles_double_sketch_get_n(sketchptr);
+ quantiles_double_sketch_delete(sketchptr);
+ PG_RETURN_INT64(n);
+}
+
+Datum pg_quantiles_double_sketch_to_string(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ char* str;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ str = quantiles_double_sketch_to_string(sketchptr);
+ quantiles_double_sketch_delete(sketchptr);
+ PG_RETURN_TEXT_P(cstring_to_text(str));
+}
+
+Datum pg_quantiles_double_sketch_merge(PG_FUNCTION_ARGS) {
+ void* unionptr;
+ bytea* sketch_bytes;
+ void* sketchptr;
+ int k;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "quantiles_double_sketch_merge called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ k = PG_NARGS() > 2 ? PG_GETARG_INT32(2) : DEFAULT_K;
+ unionptr = quantiles_double_sketch_new(k);
+ } else {
+ unionptr = PG_GETARG_POINTER(0);
+ }
+
+ sketch_bytes = PG_GETARG_BYTEA_P(1);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+ quantiles_double_sketch_merge(unionptr, sketchptr);
+ quantiles_double_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(unionptr);
+}
+
+Datum pg_quantiles_double_sketch_from_internal(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ struct ptr_with_size bytes_out;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "quantiles_double_sketch_from_internal called in non-aggregate context");
+ }
+ sketchptr = PG_GETARG_POINTER(0);
+ bytes_out = quantiles_double_sketch_serialize(sketchptr, VARHDRSZ);
+ quantiles_double_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
+}
+
+Datum pg_quantiles_double_sketch_get_pmf(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+
+ // input array of split points
+ ArrayType* arr_in;
+ Oid elmtype_in;
+ int16 elmlen_in;
+ bool elmbyval_in;
+ char elmalign_in;
+ Datum* data_in;
+ bool* nulls_in;
+ int arr_len_in;
+ double* split_points;
+
+ // output array of fractions
+ Datum* result;
+ ArrayType* arr_out;
+ int16 elmlen_out;
+ bool elmbyval_out;
+ char elmalign_out;
+ int arr_len_out;
+
+ int i;
+
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+ arr_in = PG_GETARG_ARRAYTYPE_P(1);
+ elmtype_in = ARR_ELEMTYPE(arr_in);
+ get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+ deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);
+
+ split_points = palloc(sizeof(double) * arr_len_in);
+ for (i = 0; i < arr_len_in; i++) {
+ split_points[i] = DatumGetFloat4(data_in[i]);
+ }
+ result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, false, false);
+ pfree(split_points);
+
+ // construct output array of fractions
+ arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
+ get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+ arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+ quantiles_double_sketch_delete(sketchptr);
+
+ PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_cdf(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+
+ // input array of split points
+ ArrayType* arr_in;
+ Oid elmtype_in;
+ int16 elmlen_in;
+ bool elmbyval_in;
+ char elmalign_in;
+ Datum* data_in;
+ bool* nulls_in;
+ int arr_len_in;
+ double* split_points;
+
+ // output array of fractions
+ Datum* result;
+ ArrayType* arr_out;
+ int16 elmlen_out;
+ bool elmbyval_out;
+ char elmalign_out;
+ int arr_len_out;
+
+ int i;
+
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+ arr_in = PG_GETARG_ARRAYTYPE_P(1);
+ elmtype_in = ARR_ELEMTYPE(arr_in);
+ get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+ deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len_in);
+
+ split_points = palloc(sizeof(double) * arr_len_in);
+ for (i = 0; i < arr_len_in; i++) {
+ split_points[i] = DatumGetFloat4(data_in[i]);
+ }
+ result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, arr_len_in, true, false);
+ pfree(split_points);
+
+ // construct output array of fractions
+ arr_len_out = arr_len_in + 1; // N split points devide the number line into N+1 intervals
+ get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+ arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+ quantiles_double_sketch_delete(sketchptr);
+
+ PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_quantiles(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+
+ // input array of fractions
+ ArrayType* arr_in;
+ Oid elmtype_in;
+ int16 elmlen_in;
+ bool elmbyval_in;
+ char elmalign_in;
+ Datum* data_in;
+ bool* nulls_in;
+ int arr_len;
+ double* fractions;
+
+ // output array of quantiles
+ Datum* quantiles;
+ ArrayType* arr_out;
+ int16 elmlen_out;
+ bool elmbyval_out;
+ char elmalign_out;
+
+ int i;
+
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+ arr_in = PG_GETARG_ARRAYTYPE_P(1);
+ elmtype_in = ARR_ELEMTYPE(arr_in);
+ get_typlenbyvalalign(elmtype_in, &elmlen_in, &elmbyval_in, &elmalign_in);
+ deconstruct_array(arr_in, elmtype_in, elmlen_in, elmbyval_in, elmalign_in, &data_in, &nulls_in, &arr_len);
+
+ fractions = palloc(sizeof(double) * arr_len);
+ for (i = 0; i < arr_len; i++) {
+ fractions[i] = DatumGetFloat8(data_in[i]);
+ }
+ quantiles = (Datum*) quantiles_double_sketch_get_quantiles(sketchptr, fractions, arr_len);
+ pfree(fractions);
+
+ // construct output array of quantiles
+ get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+ arr_out = construct_array(quantiles, arr_len, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+ quantiles_double_sketch_delete(sketchptr);
+
+ PG_RETURN_ARRAYTYPE_P(arr_out);
+}
+
+Datum pg_quantiles_double_sketch_get_histogram(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ int num_bins;
+
+ // output array of bins
+ Datum* result;
+ ArrayType* arr_out;
+ int16 elmlen_out;
+ bool elmbyval_out;
+ char elmalign_out;
+ int arr_len_out;
+
+ int i;
+
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = quantiles_double_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+
+ num_bins = PG_NARGS() > 1 ? PG_GETARG_INT32(1) : DEFAULT_NUM_BINS;
+ if (num_bins < 2) {
+ elog(ERROR, "at least two bins expected");
+ }
+
+ double* split_points = palloc(sizeof(double) * (num_bins - 1));
+ const double min_value = quantiles_double_sketch_get_quantile(sketchptr, 0);
+ const double max_value = quantiles_double_sketch_get_quantile(sketchptr, 1);
+ const double delta = (max_value - min_value) / num_bins;
+ for (i = 0; i < num_bins - 1; i++) {
+ split_points[i] = min_value + delta * (i + 1);
+ }
+ result = (Datum*) quantiles_double_sketch_get_pmf_or_cdf(sketchptr, split_points, num_bins - 1, false, true);
+ pfree(split_points);
+
+ // construct output array
+ arr_len_out = num_bins;
+ get_typlenbyvalalign(FLOAT8OID, &elmlen_out, &elmbyval_out, &elmalign_out);
+ arr_out = construct_array(result, arr_len_out, FLOAT8OID, elmlen_out, elmbyval_out, elmalign_out);
+
+ quantiles_double_sketch_delete(sketchptr);
+
+ PG_RETURN_ARRAYTYPE_P(arr_out);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org