You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/06/06 01:45:17 UTC
[incubator-datasketches-postgresql] 03/04: theta draft
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch theta
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git
commit f33856dc13a3aebe433bf6d7c658de0092cc6556
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Jun 3 16:25:16 2019 -0700
theta draft
---
Makefile | 11 +-
sql/datasketches_theta_sketch.sql | 93 +++++++++++++++++
src/theta_sketch_c_adapter.cpp | 145 +++++++++++++++++++++++++
src/theta_sketch_c_adapter.h | 36 +++++++
src/theta_sketch_pg_functions.c | 215 ++++++++++++++++++++++++++++++++++++++
5 files changed, 495 insertions(+), 5 deletions(-)
diff --git a/Makefile b/Makefile
index 6f08583..bcac339 100644
--- a/Makefile
+++ b/Makefile
@@ -3,17 +3,18 @@ MODULE_big = datasketches
OBJS = src/base64.o src/common.o \
src/kll_float_sketch_pg_functions.o src/kll_float_sketch_c_adapter.o \
- src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o
+ src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \
+ src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o
-# assume a copy or link sketches-core-cpp in the current dir
-CORE = sketches-core-cpp
+# assume a copy or link datasketches-cpp in the current dir
+CORE = datasketches-cpp
CPC = $(CORE)/cpc/src
OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
-DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql
+DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql
CXX = g++-8
-PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include
+PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include
SHLIB_LINK = -lstdc++ -L/usr/local/lib
PG_CONFIG = pg_config
diff --git a/sql/datasketches_theta_sketch.sql b/sql/datasketches_theta_sketch.sql
new file mode 100644
index 0000000..f5200ff
--- /dev/null
+++ b/sql/datasketches_theta_sketch.sql
@@ -0,0 +1,93 @@
+-- Copyright 2019, Verizon Media.
+-- Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+
+CREATE TYPE theta_sketch;
+
+CREATE OR REPLACE FUNCTION theta_sketch_in(cstring) RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_sketch_in'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_out(theta_sketch) RETURNS cstring
+ AS '$libdir/datasketches', 'pg_sketch_out'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE TYPE theta_sketch (
+ INPUT = theta_sketch_in,
+ OUTPUT = theta_sketch_out,
+ STORAGE = EXTERNAL
+);
+
+CREATE CAST (bytea as theta_sketch) WITHOUT FUNCTION AS ASSIGNMENT;
+CREATE CAST (theta_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT;
+
+CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement) RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_add_item'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement, int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_add_item'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_get_estimate(theta_sketch) RETURNS double precision
+ AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_from_internal(internal) RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_theta_sketch_from_internal'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_get_estimate_from_internal(internal) RETURNS double precision
+ AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate_from_internal'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_to_string(theta_sketch) RETURNS TEXT
+ AS '$libdir/datasketches', 'pg_theta_sketch_to_string'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch) RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_union'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch, int) RETURNS internal
+ AS '$libdir/datasketches', 'pg_theta_sketch_union'
+ LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_union_get_result(internal) RETURNS theta_sketch
+ AS '$libdir/datasketches', 'pg_theta_union_get_result'
+ LANGUAGE C STRICT IMMUTABLE;
+
+CREATE AGGREGATE theta_sketch_distinct(anyelement) (
+ sfunc = theta_sketch_add_item,
+ stype = internal,
+ finalfunc = theta_sketch_get_estimate_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_distinct(anyelement, int) (
+ sfunc = theta_sketch_add_item,
+ stype = internal,
+ finalfunc = theta_sketch_get_estimate_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_build(anyelement) (
+ sfunc = theta_sketch_add_item,
+ stype = internal,
+ finalfunc = theta_sketch_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_build(anyelement, int) (
+ sfunc = theta_sketch_add_item,
+ stype = internal,
+ finalfunc = theta_sketch_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_union(theta_sketch) (
+ sfunc = theta_sketch_union,
+ stype = internal,
+ finalfunc = theta_union_get_result
+);
+
+CREATE AGGREGATE theta_sketch_union(theta_sketch, int) (
+ sfunc = theta_sketch_union,
+ stype = internal,
+ finalfunc = theta_union_get_result
+);
diff --git a/src/theta_sketch_c_adapter.cpp b/src/theta_sketch_c_adapter.cpp
new file mode 100644
index 0000000..968204c
--- /dev/null
+++ b/src/theta_sketch_c_adapter.cpp
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#include "theta_sketch_c_adapter.h"
+#include "allocator.h"
+
+extern "C" {
+#include <postgres.h>
+}
+
+#include <sstream>
+
+#include <theta_sketch.hpp>
+#include <theta_union.hpp>
+
+typedef datasketches::theta_sketch_alloc<palloc_allocator<void>> theta_sketch_pg;
+typedef datasketches::update_theta_sketch_alloc<palloc_allocator<void>> update_theta_sketch_pg;
+typedef datasketches::compact_theta_sketch_alloc<palloc_allocator<void>> compact_theta_sketch_pg;
+typedef datasketches::theta_union_alloc<palloc_allocator<void>> theta_union_pg;
+
+void* theta_sketch_new_default() {
+ try {
+ return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().build());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_sketch_new(unsigned lg_k) {
+ try {
+ return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().set_lg_k(lg_k).build());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_sketch_delete(void* sketchptr) {
+ try {
+ static_cast<theta_sketch_pg*>(sketchptr)->~theta_sketch_pg();
+ pfree(sketchptr);
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_sketch_update(void* sketchptr, const void* data, unsigned length) {
+ try {
+ static_cast<update_theta_sketch_pg*>(sketchptr)->update(data, length);
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_sketch_compact(void* sketchptr) {
+ try {
+ auto newptr = new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<update_theta_sketch_pg*>(sketchptr)->compact());
+ static_cast<update_theta_sketch_pg*>(sketchptr)->~update_theta_sketch_pg();
+ pfree(sketchptr);
+ return newptr;
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+double theta_sketch_get_estimate(const void* sketchptr) {
+ try {
+ return static_cast<const theta_sketch_pg*>(sketchptr)->get_estimate();
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
+ try {
+ std::stringstream s;
+ static_cast<const theta_sketch_pg*>(sketchptr)->to_stream(s);
+ snprintf(buffer, length, s.str().c_str());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_sketch_serialize(const void* sketchptr) {
+ try {
+ auto data = static_cast<const theta_sketch_pg*>(sketchptr)->serialize(VARHDRSZ);
+ bytea* buffer = (bytea*) data.first.release();
+ const size_t length = data.second;
+ SET_VARSIZE(buffer, length);
+ return buffer;
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_sketch_deserialize(const char* buffer, unsigned length) {
+ try {
+ auto ptr = theta_sketch_pg::deserialize(buffer, length);
+ return ptr.release();
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_union_new_default() {
+ try {
+ return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().build());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_union_new(unsigned lg_k) {
+ try {
+ return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().set_lg_k(lg_k).build());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_union_delete(void* unionptr) {
+ try {
+ static_cast<theta_union_pg*>(unionptr)->~theta_union_pg();
+ pfree(unionptr);
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void theta_union_update(void* unionptr, const void* sketchptr) {
+ try {
+ static_cast<theta_union_pg*>(unionptr)->update(*static_cast<const theta_sketch_pg*>(sketchptr));
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
+
+void* theta_union_get_result(void* unionptr) {
+ try {
+ return new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<theta_union_pg*>(unionptr)->get_result());
+ } catch (std::exception& e) {
+ elog(ERROR, e.what());
+ }
+}
diff --git a/src/theta_sketch_c_adapter.h b/src/theta_sketch_c_adapter.h
new file mode 100644
index 0000000..57c3df5
--- /dev/null
+++ b/src/theta_sketch_c_adapter.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#ifndef THETA_SKETCH_C_ADAPTER_H
+#define THETA_SKETCH_C_ADAPTER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void* theta_sketch_new_default();
+void* theta_sketch_new(unsigned lg_k);
+void theta_sketch_delete(void* sketchptr);
+
+void theta_sketch_update(void* sketchptr, const void* data, unsigned length);
+void* theta_sketch_compact(void* sketchptr);
+void theta_sketch_union(void* sketchptr1, const void* sketchptr2);
+double theta_sketch_get_estimate(const void* sketchptr);
+void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
+
+void* theta_sketch_serialize(const void* sketchptr);
+void* theta_sketch_deserialize(const char* buffer, unsigned length);
+
+void* theta_union_new_default();
+void* theta_union_new(unsigned lg_k);
+void theta_union_delete(void* unionptr);
+void theta_union_update(void* unionptr, const void* sketchptr);
+void* theta_union_get_result(void* unionptr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/theta_sketch_pg_functions.c b/src/theta_sketch_pg_functions.c
new file mode 100644
index 0000000..2c19264
--- /dev/null
+++ b/src/theta_sketch_pg_functions.c
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#include <postgres.h>
+#include <fmgr.h>
+#include <utils/lsyscache.h>
+#include <utils/builtins.h>
+
+#include "theta_sketch_c_adapter.h"
+#include "base64.h"
+
+/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
+PG_FUNCTION_INFO_V1(pg_theta_sketch_add_item);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_to_string);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_merge);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_from_internal);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_from_internal);
+PG_FUNCTION_INFO_V1(pg_theta_union_get_result);
+
+/* function declarations */
+Datum pg_theta_sketch_recv(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_send(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS);
+Datum pg_theta_union_get_result(PG_FUNCTION_ARGS);
+
+Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ int lg_k;
+
+ // anyelement
+ Oid element_type;
+ Datum element;
+ int16 typlen;
+ bool typbyval;
+ char typalign;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_sketch_add_item called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ lg_k = PG_GETARG_INT32(2);
+ sketchptr = lg_k ? theta_sketch_new(lg_k) : theta_sketch_new_default();
+ } else {
+ sketchptr = PG_GETARG_POINTER(0);
+ }
+
+ element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
+ element = PG_GETARG_DATUM(1);
+ get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+ if (typlen == -1) {
+ // varlena
+ theta_sketch_update(sketchptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element));
+ } else if (typbyval) {
+ // fixed-length passed by value
+ theta_sketch_update(sketchptr, &element, typlen);
+ } else {
+ // fixed-length passed by reference
+ theta_sketch_update(sketchptr, (void*)element, typlen);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(sketchptr);
+}
+
+Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ double estimate;
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ estimate = theta_sketch_get_estimate(sketchptr);
+ theta_sketch_delete(sketchptr);
+ PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS) {
+ const bytea* bytes_in;
+ void* sketchptr;
+ char str[1024];
+ bytes_in = PG_GETARG_BYTEA_P(0);
+ sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+ theta_sketch_to_string(sketchptr, str, 1024);
+ theta_sketch_delete(sketchptr);
+ PG_RETURN_TEXT_P(cstring_to_text(str));
+}
+
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) {
+ void* unionptr;
+ bytea* sketch_bytes;
+ void* sketchptr;
+ int lg_k;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+ PG_RETURN_NULL();
+ } else if (PG_ARGISNULL(1)) {
+ PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+ }
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_sketch_merge called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ if (PG_ARGISNULL(0)) {
+ lg_k = PG_GETARG_INT32(2);
+ unionptr = lg_k ? theta_union_new(lg_k) : theta_union_new_default();
+ } else {
+ unionptr = PG_GETARG_POINTER(0);
+ }
+
+ sketch_bytes = PG_GETARG_BYTEA_P(1);
+ sketchptr = theta_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+ theta_union_update(unionptr, sketchptr);
+ theta_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_POINTER(unionptr);
+}
+
+Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ bytea* bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_sketch_from_internal called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ sketchptr = PG_GETARG_POINTER(0);
+ sketchptr = theta_sketch_compact(sketchptr);
+ bytes_out = theta_sketch_serialize(sketchptr);
+ theta_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out);
+}
+
+Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
+ void* sketchptr;
+ double estimate;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_sketch_from_internal called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ sketchptr = PG_GETARG_POINTER(0);
+ estimate = theta_sketch_get_estimate(sketchptr);
+ theta_sketch_delete(sketchptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_theta_union_get_result(PG_FUNCTION_ARGS) {
+ void* unionptr;
+ void* sketchptr;
+ bytea* bytes_out;
+
+ MemoryContext oldcontext;
+ MemoryContext aggcontext;
+
+ if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+ if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+ elog(ERROR, "theta_union_get_result called in non-aggregate context");
+ }
+ oldcontext = MemoryContextSwitchTo(aggcontext);
+
+ unionptr = PG_GETARG_POINTER(0);
+ sketchptr = theta_union_get_result(unionptr);
+ bytes_out = theta_sketch_serialize(sketchptr);
+ theta_sketch_delete(sketchptr);
+ theta_union_delete(unionptr);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_RETURN_BYTEA_P(bytes_out);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org