You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/11/09 02:14:44 UTC

[incubator-datasketches-postgresql] 01/01: use header-only version of CPC sketch

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch cpc_sketch_header_only
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git

commit b7c90730da91aca7b2dcd5827ea93ddc41145f8c
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Fri Nov 8 18:14:13 2019 -0800

    use header-only version of CPC sketch
---
 Makefile                      |  3 --
 src/allocator.h               |  2 +-
 src/cpc_sketch_c_adapter.cpp  | 79 ++++++++++++++++++++++++-------------------
 src/cpc_sketch_c_adapter.h    | 11 +++---
 src/cpc_sketch_pg_functions.c | 23 +++++++------
 src/global_hooks.c            |  3 ++
 6 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/Makefile b/Makefile
index 2b6e81b..61ddebf 100644
--- a/Makefile
+++ b/Makefile
@@ -35,9 +35,6 @@ OBJS = src/global_hooks.o src/base64.o src/common.o \
 # assume a copy or link datasketches-cpp in the current dir
 CORE = datasketches-cpp
 
-CPC = $(CORE)/cpc/src
-OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
-
 PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include
 PG_CXXFLAGS = -std=c++11
 SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/src/allocator.h b/src/allocator.h
index c66a0a9..1a49f4b 100644
--- a/src/allocator.h
+++ b/src/allocator.h
@@ -50,7 +50,7 @@ public:
 
   pointer address(reference x) const { return &x; }
   const_pointer address(const_reference x) const {
-    return x;
+    return &x;
   }
 
   pointer allocate(size_type n, const_pointer = 0) {
diff --git a/src/cpc_sketch_c_adapter.cpp b/src/cpc_sketch_c_adapter.cpp
index 35de712..3d18828 100644
--- a/src/cpc_sketch_c_adapter.cpp
+++ b/src/cpc_sketch_c_adapter.cpp
@@ -18,126 +18,135 @@
  */
 
 #include "cpc_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
 
 #include <sstream>
 
 #include <cpc_sketch.hpp>
 #include <cpc_union.hpp>
 
+typedef datasketches::cpc_sketch_alloc<palloc_allocator<char>> cpc_sketch_pg;
+typedef datasketches::cpc_union_alloc<palloc_allocator<char>> cpc_union_pg;
+
 void cpc_init() {
-  datasketches::cpc_init(&palloc, &pfree);
+  datasketches::cpc_init<palloc_allocator<char>>();
 }
 
 void cpc_cleanup() {
-  datasketches::cpc_cleanup();
 }
 
 void* cpc_sketch_new(unsigned lg_k) {
   try {
-    return new (palloc(sizeof(datasketches::cpc_sketch))) datasketches::cpc_sketch(lg_k, datasketches::DEFAULT_SEED);
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(lg_k, datasketches::DEFAULT_SEED);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_sketch_delete(void* sketchptr) {
   try {
-    static_cast<datasketches::cpc_sketch*>(sketchptr)->~cpc_sketch();
+    static_cast<cpc_sketch_pg*>(sketchptr)->~cpc_sketch_pg();
     pfree(sketchptr);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void cpc_sketch_update(void* sketchptr, const void* data, unsigned length) {
   try {
-    static_cast<datasketches::cpc_sketch*>(sketchptr)->update(data, length);
+    static_cast<cpc_sketch_pg*>(sketchptr)->update(data, length);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 double cpc_sketch_get_estimate(const void* sketchptr) {
   try {
-    return static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate();
+    return static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate();
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) {
   try {
     Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3);
-    est_and_bounds[0] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate());
-    est_and_bounds[1] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_lower_bound(num_std_devs));
-    est_and_bounds[2] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_upper_bound(num_std_devs));
+    est_and_bounds[0] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate());
+    est_and_bounds[1] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_lower_bound(num_std_devs));
+    est_and_bounds[2] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_upper_bound(num_std_devs));
     return est_and_bounds;
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
   try {
     std::stringstream s;
-    s << *(static_cast<const datasketches::cpc_sketch*>(sketchptr));
+    static_cast<const cpc_sketch_pg*>(sketchptr)->to_stream(s);;
     snprintf(buffer, length, "%s", s.str().c_str());
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
-void* cpc_sketch_serialize(const void* sketchptr) {
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size) {
   try {
-    auto data = static_cast<const datasketches::cpc_sketch*>(sketchptr)->serialize(VARHDRSZ);
-    bytea* buffer = (bytea*) data.first.release();
-    const size_t length = data.second;
-    SET_VARSIZE(buffer, length);
-    return buffer;
+    ptr_with_size p;
+    auto data = static_cast<const cpc_sketch_pg*>(sketchptr)->serialize(header_size);
+    p.ptr = data.first.release();
+    p.size = data.second;
+    return p;
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void* cpc_sketch_deserialize(const char* buffer, unsigned length) {
   try {
-    auto ptr = datasketches::cpc_sketch::deserialize(buffer, length, datasketches::DEFAULT_SEED);
-    return ptr.release();
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(cpc_sketch_pg::deserialize(buffer, length, datasketches::DEFAULT_SEED));
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void* cpc_union_new(unsigned lg_k) {
   try {
-    return new (palloc(sizeof(datasketches::cpc_union))) datasketches::cpc_union(lg_k, datasketches::DEFAULT_SEED);
+    return new (palloc(sizeof(cpc_union_pg))) cpc_union_pg(lg_k, datasketches::DEFAULT_SEED);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_union_delete(void* unionptr) {
   try {
-    static_cast<datasketches::cpc_union*>(unionptr)->~cpc_union();
+    static_cast<cpc_union_pg*>(unionptr)->~cpc_union_pg();
     pfree(unionptr);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void cpc_union_update(void* unionptr, const void* sketchptr) {
   try {
-    static_cast<datasketches::cpc_union*>(unionptr)->update(*static_cast<const datasketches::cpc_sketch*>(sketchptr));
+    static_cast<cpc_union_pg*>(unionptr)->update(*static_cast<const cpc_sketch_pg*>(sketchptr));
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void* cpc_union_get_result(void* unionptr) {
   try {
-    auto ptr = static_cast<datasketches::cpc_union*>(unionptr)->get_result();
-    return ptr.release();
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(static_cast<cpc_union_pg*>(unionptr)->get_result());
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
diff --git a/src/cpc_sketch_c_adapter.h b/src/cpc_sketch_c_adapter.h
index 91683ef..338f391 100644
--- a/src/cpc_sketch_c_adapter.h
+++ b/src/cpc_sketch_c_adapter.h
@@ -24,8 +24,6 @@
 extern "C" {
 #endif
 
-#include <postgres.h>
-
 void cpc_init();
 void cpc_cleanup();
 
@@ -35,10 +33,15 @@ void cpc_sketch_delete(void* sketchptr);
 void cpc_sketch_update(void* sketchptr, const void* data, unsigned length);
 void cpc_sketch_merge(void* sketchptr1, const void* sketchptr2);
 double cpc_sketch_get_estimate(const void* sketchptr);
-Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
+void** cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
 void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
 
-void* cpc_sketch_serialize(const void* sketchptr);
+struct ptr_with_size {
+  void* ptr;
+  unsigned long long size;
+};
+
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size);
 void* cpc_sketch_deserialize(const char* buffer, unsigned length);
 
 void* cpc_union_new(unsigned lg_k);
diff --git a/src/cpc_sketch_pg_functions.c b/src/cpc_sketch_pg_functions.c
index c8b6aa9..98000c5 100644
--- a/src/cpc_sketch_pg_functions.c
+++ b/src/cpc_sketch_pg_functions.c
@@ -131,7 +131,7 @@ Datum pg_cpc_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS) {
   sketchptr = cpc_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
   num_std_devs = PG_GETARG_INT32(1);
   if (num_std_devs == 0) num_std_devs = 1; // default
-  est_and_bounds = cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
+  est_and_bounds = (Datum*) cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
   cpc_sketch_delete(sketchptr);
 
   // construct output array
@@ -190,7 +190,7 @@ Datum pg_cpc_sketch_union_agg(PG_FUNCTION_ARGS) {
 
 Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
 
   MemoryContext oldcontext;
   MemoryContext aggcontext;
@@ -203,12 +203,13 @@ Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
   oldcontext = MemoryContextSwitchTo(aggcontext);
 
   sketchptr = PG_GETARG_POINTER(0);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
 
   MemoryContextSwitchTo(oldcontext);
 
-  PG_RETURN_BYTEA_P(bytes_out);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
 
 Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
@@ -237,7 +238,7 @@ Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
 Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
   void* unionptr;
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
 
   MemoryContext oldcontext;
   MemoryContext aggcontext;
@@ -251,13 +252,14 @@ Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
 
   unionptr = PG_GETARG_POINTER(0);
   sketchptr = cpc_union_get_result(unionptr);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
   cpc_union_delete(unionptr);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
 
   MemoryContextSwitchTo(oldcontext);
 
-  PG_RETURN_BYTEA_P(bytes_out);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
 
 Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
@@ -267,7 +269,7 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
   void* sketchptr2;
   void* unionptr;
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
   int lg_k;
 
   lg_k = PG_GETARG_INT32(2);
@@ -286,7 +288,8 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
   }
   sketchptr = cpc_union_get_result(unionptr);
   cpc_union_delete(unionptr);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
-  PG_RETURN_BYTEA_P(bytes_out);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
diff --git a/src/global_hooks.c b/src/global_hooks.c
index 7f8d529..f1fdfdd 100644
--- a/src/global_hooks.c
+++ b/src/global_hooks.c
@@ -22,6 +22,9 @@
 
 #include "cpc_sketch_c_adapter.h"
 
+void _PG_init(void);
+void _PG_fini(void);
+
 void _PG_init() {
   cpc_init();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org