You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/11/09 02:14:44 UTC
[incubator-datasketches-postgresql] 01/01: use header-only version
of CPC sketch
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch cpc_sketch_header_only
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git
commit b7c90730da91aca7b2dcd5827ea93ddc41145f8c
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Fri Nov 8 18:14:13 2019 -0800
use header-only version of CPC sketch
---
Makefile | 3 --
src/allocator.h | 2 +-
src/cpc_sketch_c_adapter.cpp | 79 ++++++++++++++++++++++++-------------------
src/cpc_sketch_c_adapter.h | 11 +++---
src/cpc_sketch_pg_functions.c | 23 +++++++------
src/global_hooks.c | 3 ++
6 files changed, 68 insertions(+), 53 deletions(-)
diff --git a/Makefile b/Makefile
index 2b6e81b..61ddebf 100644
--- a/Makefile
+++ b/Makefile
@@ -35,9 +35,6 @@ OBJS = src/global_hooks.o src/base64.o src/common.o \
# assume a copy or link datasketches-cpp in the current dir
CORE = datasketches-cpp
-CPC = $(CORE)/cpc/src
-OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
-
PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include
PG_CXXFLAGS = -std=c++11
SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/src/allocator.h b/src/allocator.h
index c66a0a9..1a49f4b 100644
--- a/src/allocator.h
+++ b/src/allocator.h
@@ -50,7 +50,7 @@ public:
pointer address(reference x) const { return &x; }
const_pointer address(const_reference x) const {
- return x;
+ return &x;
}
pointer allocate(size_type n, const_pointer = 0) {
diff --git a/src/cpc_sketch_c_adapter.cpp b/src/cpc_sketch_c_adapter.cpp
index 35de712..3d18828 100644
--- a/src/cpc_sketch_c_adapter.cpp
+++ b/src/cpc_sketch_c_adapter.cpp
@@ -18,126 +18,135 @@
*/
#include "cpc_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
#include <sstream>
#include <cpc_sketch.hpp>
#include <cpc_union.hpp>
+typedef datasketches::cpc_sketch_alloc<palloc_allocator<char>> cpc_sketch_pg;
+typedef datasketches::cpc_union_alloc<palloc_allocator<char>> cpc_union_pg;
+
void cpc_init() {
- datasketches::cpc_init(&palloc, &pfree);
+ datasketches::cpc_init<palloc_allocator<char>>();
}
void cpc_cleanup() {
- datasketches::cpc_cleanup();
}
void* cpc_sketch_new(unsigned lg_k) {
try {
- return new (palloc(sizeof(datasketches::cpc_sketch))) datasketches::cpc_sketch(lg_k, datasketches::DEFAULT_SEED);
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(lg_k, datasketches::DEFAULT_SEED);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_sketch_delete(void* sketchptr) {
try {
- static_cast<datasketches::cpc_sketch*>(sketchptr)->~cpc_sketch();
+ static_cast<cpc_sketch_pg*>(sketchptr)->~cpc_sketch_pg();
pfree(sketchptr);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void cpc_sketch_update(void* sketchptr, const void* data, unsigned length) {
try {
- static_cast<datasketches::cpc_sketch*>(sketchptr)->update(data, length);
+ static_cast<cpc_sketch_pg*>(sketchptr)->update(data, length);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
double cpc_sketch_get_estimate(const void* sketchptr) {
try {
- return static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate();
+ return static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate();
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) {
try {
Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3);
- est_and_bounds[0] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate());
- est_and_bounds[1] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_lower_bound(num_std_devs));
- est_and_bounds[2] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_upper_bound(num_std_devs));
+ est_and_bounds[0] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate());
+ est_and_bounds[1] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_lower_bound(num_std_devs));
+ est_and_bounds[2] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_upper_bound(num_std_devs));
return est_and_bounds;
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
try {
std::stringstream s;
- s << *(static_cast<const datasketches::cpc_sketch*>(sketchptr));
+ static_cast<const cpc_sketch_pg*>(sketchptr)->to_stream(s);;
snprintf(buffer, length, "%s", s.str().c_str());
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
-void* cpc_sketch_serialize(const void* sketchptr) {
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size) {
try {
- auto data = static_cast<const datasketches::cpc_sketch*>(sketchptr)->serialize(VARHDRSZ);
- bytea* buffer = (bytea*) data.first.release();
- const size_t length = data.second;
- SET_VARSIZE(buffer, length);
- return buffer;
+ ptr_with_size p;
+ auto data = static_cast<const cpc_sketch_pg*>(sketchptr)->serialize(header_size);
+ p.ptr = data.first.release();
+ p.size = data.second;
+ return p;
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void* cpc_sketch_deserialize(const char* buffer, unsigned length) {
try {
- auto ptr = datasketches::cpc_sketch::deserialize(buffer, length, datasketches::DEFAULT_SEED);
- return ptr.release();
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(cpc_sketch_pg::deserialize(buffer, length, datasketches::DEFAULT_SEED));
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void* cpc_union_new(unsigned lg_k) {
try {
- return new (palloc(sizeof(datasketches::cpc_union))) datasketches::cpc_union(lg_k, datasketches::DEFAULT_SEED);
+ return new (palloc(sizeof(cpc_union_pg))) cpc_union_pg(lg_k, datasketches::DEFAULT_SEED);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
void cpc_union_delete(void* unionptr) {
try {
- static_cast<datasketches::cpc_union*>(unionptr)->~cpc_union();
+ static_cast<cpc_union_pg*>(unionptr)->~cpc_union_pg();
pfree(unionptr);
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void cpc_union_update(void* unionptr, const void* sketchptr) {
try {
- static_cast<datasketches::cpc_union*>(unionptr)->update(*static_cast<const datasketches::cpc_sketch*>(sketchptr));
+ static_cast<cpc_union_pg*>(unionptr)->update(*static_cast<const cpc_sketch_pg*>(sketchptr));
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
}
void* cpc_union_get_result(void* unionptr) {
try {
- auto ptr = static_cast<datasketches::cpc_union*>(unionptr)->get_result();
- return ptr.release();
+ return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(static_cast<cpc_union_pg*>(unionptr)->get_result());
} catch (std::exception& e) {
- elog(ERROR, "%s", e.what());
+ pg_error(e.what());
}
+ pg_unreachable();
}
diff --git a/src/cpc_sketch_c_adapter.h b/src/cpc_sketch_c_adapter.h
index 91683ef..338f391 100644
--- a/src/cpc_sketch_c_adapter.h
+++ b/src/cpc_sketch_c_adapter.h
@@ -24,8 +24,6 @@
extern "C" {
#endif
-#include <postgres.h>
-
void cpc_init();
void cpc_cleanup();
@@ -35,10 +33,15 @@ void cpc_sketch_delete(void* sketchptr);
void cpc_sketch_update(void* sketchptr, const void* data, unsigned length);
void cpc_sketch_merge(void* sketchptr1, const void* sketchptr2);
double cpc_sketch_get_estimate(const void* sketchptr);
-Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
+void** cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
-void* cpc_sketch_serialize(const void* sketchptr);
+struct ptr_with_size {
+ void* ptr;
+ unsigned long long size;
+};
+
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size);
void* cpc_sketch_deserialize(const char* buffer, unsigned length);
void* cpc_union_new(unsigned lg_k);
diff --git a/src/cpc_sketch_pg_functions.c b/src/cpc_sketch_pg_functions.c
index c8b6aa9..98000c5 100644
--- a/src/cpc_sketch_pg_functions.c
+++ b/src/cpc_sketch_pg_functions.c
@@ -131,7 +131,7 @@ Datum pg_cpc_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS) {
sketchptr = cpc_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
num_std_devs = PG_GETARG_INT32(1);
if (num_std_devs == 0) num_std_devs = 1; // default
- est_and_bounds = cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
+ est_and_bounds = (Datum*) cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
cpc_sketch_delete(sketchptr);
// construct output array
@@ -190,7 +190,7 @@ Datum pg_cpc_sketch_union_agg(PG_FUNCTION_ARGS) {
Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
MemoryContext oldcontext;
MemoryContext aggcontext;
@@ -203,12 +203,13 @@ Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
oldcontext = MemoryContextSwitchTo(aggcontext);
sketchptr = PG_GETARG_POINTER(0);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
MemoryContextSwitchTo(oldcontext);
- PG_RETURN_BYTEA_P(bytes_out);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
@@ -237,7 +238,7 @@ Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
void* unionptr;
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
MemoryContext oldcontext;
MemoryContext aggcontext;
@@ -251,13 +252,14 @@ Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
unionptr = PG_GETARG_POINTER(0);
sketchptr = cpc_union_get_result(unionptr);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
cpc_union_delete(unionptr);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
MemoryContextSwitchTo(oldcontext);
- PG_RETURN_BYTEA_P(bytes_out);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
@@ -267,7 +269,7 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
void* sketchptr2;
void* unionptr;
void* sketchptr;
- bytea* bytes_out;
+ struct ptr_with_size bytes_out;
int lg_k;
lg_k = PG_GETARG_INT32(2);
@@ -286,7 +288,8 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
}
sketchptr = cpc_union_get_result(unionptr);
cpc_union_delete(unionptr);
- bytes_out = cpc_sketch_serialize(sketchptr);
+ bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
cpc_sketch_delete(sketchptr);
- PG_RETURN_BYTEA_P(bytes_out);
+ SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+ PG_RETURN_BYTEA_P(bytes_out.ptr);
}
diff --git a/src/global_hooks.c b/src/global_hooks.c
index 7f8d529..f1fdfdd 100644
--- a/src/global_hooks.c
+++ b/src/global_hooks.c
@@ -22,6 +22,9 @@
#include "cpc_sketch_c_adapter.h"
+void _PG_init(void);
+void _PG_fini(void);
+
void _PG_init() {
cpc_init();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org