You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/11/09 02:14:43 UTC

[incubator-datasketches-postgresql] branch cpc_sketch_header_only created (now b7c9073)

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a change to branch cpc_sketch_header_only
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git.


      at b7c9073  use header-only version of CPC sketch

This branch includes the following new commits:

     new b7c9073  use header-only version of CPC sketch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org


[incubator-datasketches-postgresql] 01/01: use header-only version of CPC sketch

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch cpc_sketch_header_only
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git

commit b7c90730da91aca7b2dcd5827ea93ddc41145f8c
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Fri Nov 8 18:14:13 2019 -0800

    use header-only version of CPC sketch
---
 Makefile                      |  3 --
 src/allocator.h               |  2 +-
 src/cpc_sketch_c_adapter.cpp  | 79 ++++++++++++++++++++++++-------------------
 src/cpc_sketch_c_adapter.h    | 11 +++---
 src/cpc_sketch_pg_functions.c | 23 +++++++------
 src/global_hooks.c            |  3 ++
 6 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/Makefile b/Makefile
index 2b6e81b..61ddebf 100644
--- a/Makefile
+++ b/Makefile
@@ -35,9 +35,6 @@ OBJS = src/global_hooks.o src/base64.o src/common.o \
 # assume a copy or link datasketches-cpp in the current dir
 CORE = datasketches-cpp
 
-CPC = $(CORE)/cpc/src
-OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
-
 PG_CPPFLAGS = -fPIC -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include -I$(CORE)/fi/include -I$(CORE)/hll/include
 PG_CXXFLAGS = -std=c++11
 SHLIB_LINK = -lstdc++ -L/usr/local/lib
diff --git a/src/allocator.h b/src/allocator.h
index c66a0a9..1a49f4b 100644
--- a/src/allocator.h
+++ b/src/allocator.h
@@ -50,7 +50,7 @@ public:
 
   pointer address(reference x) const { return &x; }
   const_pointer address(const_reference x) const {
-    return x;
+    return &x;
   }
 
   pointer allocate(size_type n, const_pointer = 0) {
diff --git a/src/cpc_sketch_c_adapter.cpp b/src/cpc_sketch_c_adapter.cpp
index 35de712..3d18828 100644
--- a/src/cpc_sketch_c_adapter.cpp
+++ b/src/cpc_sketch_c_adapter.cpp
@@ -18,126 +18,135 @@
  */
 
 #include "cpc_sketch_c_adapter.h"
+#include "allocator.h"
+#include "postgres_h_substitute.h"
 
 #include <sstream>
 
 #include <cpc_sketch.hpp>
 #include <cpc_union.hpp>
 
+typedef datasketches::cpc_sketch_alloc<palloc_allocator<char>> cpc_sketch_pg;
+typedef datasketches::cpc_union_alloc<palloc_allocator<char>> cpc_union_pg;
+
 void cpc_init() {
-  datasketches::cpc_init(&palloc, &pfree);
+  datasketches::cpc_init<palloc_allocator<char>>();
 }
 
 void cpc_cleanup() {
-  datasketches::cpc_cleanup();
 }
 
 void* cpc_sketch_new(unsigned lg_k) {
   try {
-    return new (palloc(sizeof(datasketches::cpc_sketch))) datasketches::cpc_sketch(lg_k, datasketches::DEFAULT_SEED);
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(lg_k, datasketches::DEFAULT_SEED);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_sketch_delete(void* sketchptr) {
   try {
-    static_cast<datasketches::cpc_sketch*>(sketchptr)->~cpc_sketch();
+    static_cast<cpc_sketch_pg*>(sketchptr)->~cpc_sketch_pg();
     pfree(sketchptr);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void cpc_sketch_update(void* sketchptr, const void* data, unsigned length) {
   try {
-    static_cast<datasketches::cpc_sketch*>(sketchptr)->update(data, length);
+    static_cast<cpc_sketch_pg*>(sketchptr)->update(data, length);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 double cpc_sketch_get_estimate(const void* sketchptr) {
   try {
-    return static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate();
+    return static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate();
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs) {
   try {
     Datum* est_and_bounds = (Datum*) palloc(sizeof(Datum) * 3);
-    est_and_bounds[0] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_estimate());
-    est_and_bounds[1] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_lower_bound(num_std_devs));
-    est_and_bounds[2] = Float8GetDatum(static_cast<const datasketches::cpc_sketch*>(sketchptr)->get_upper_bound(num_std_devs));
+    est_and_bounds[0] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_estimate());
+    est_and_bounds[1] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_lower_bound(num_std_devs));
+    est_and_bounds[2] = pg_float8_get_datum(static_cast<const cpc_sketch_pg*>(sketchptr)->get_upper_bound(num_std_devs));
     return est_and_bounds;
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
   try {
     std::stringstream s;
-    s << *(static_cast<const datasketches::cpc_sketch*>(sketchptr));
+    static_cast<const cpc_sketch_pg*>(sketchptr)->to_stream(s);;
     snprintf(buffer, length, "%s", s.str().c_str());
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
-void* cpc_sketch_serialize(const void* sketchptr) {
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size) {
   try {
-    auto data = static_cast<const datasketches::cpc_sketch*>(sketchptr)->serialize(VARHDRSZ);
-    bytea* buffer = (bytea*) data.first.release();
-    const size_t length = data.second;
-    SET_VARSIZE(buffer, length);
-    return buffer;
+    ptr_with_size p;
+    auto data = static_cast<const cpc_sketch_pg*>(sketchptr)->serialize(header_size);
+    p.ptr = data.first.release();
+    p.size = data.second;
+    return p;
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void* cpc_sketch_deserialize(const char* buffer, unsigned length) {
   try {
-    auto ptr = datasketches::cpc_sketch::deserialize(buffer, length, datasketches::DEFAULT_SEED);
-    return ptr.release();
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(cpc_sketch_pg::deserialize(buffer, length, datasketches::DEFAULT_SEED));
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void* cpc_union_new(unsigned lg_k) {
   try {
-    return new (palloc(sizeof(datasketches::cpc_union))) datasketches::cpc_union(lg_k, datasketches::DEFAULT_SEED);
+    return new (palloc(sizeof(cpc_union_pg))) cpc_union_pg(lg_k, datasketches::DEFAULT_SEED);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
 
 void cpc_union_delete(void* unionptr) {
   try {
-    static_cast<datasketches::cpc_union*>(unionptr)->~cpc_union();
+    static_cast<cpc_union_pg*>(unionptr)->~cpc_union_pg();
     pfree(unionptr);
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void cpc_union_update(void* unionptr, const void* sketchptr) {
   try {
-    static_cast<datasketches::cpc_union*>(unionptr)->update(*static_cast<const datasketches::cpc_sketch*>(sketchptr));
+    static_cast<cpc_union_pg*>(unionptr)->update(*static_cast<const cpc_sketch_pg*>(sketchptr));
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
 }
 
 void* cpc_union_get_result(void* unionptr) {
   try {
-    auto ptr = static_cast<datasketches::cpc_union*>(unionptr)->get_result();
-    return ptr.release();
+    return new (palloc(sizeof(cpc_sketch_pg))) cpc_sketch_pg(static_cast<cpc_union_pg*>(unionptr)->get_result());
   } catch (std::exception& e) {
-    elog(ERROR, "%s", e.what());
+    pg_error(e.what());
   }
+  pg_unreachable();
 }
diff --git a/src/cpc_sketch_c_adapter.h b/src/cpc_sketch_c_adapter.h
index 91683ef..338f391 100644
--- a/src/cpc_sketch_c_adapter.h
+++ b/src/cpc_sketch_c_adapter.h
@@ -24,8 +24,6 @@
 extern "C" {
 #endif
 
-#include <postgres.h>
-
 void cpc_init();
 void cpc_cleanup();
 
@@ -35,10 +33,15 @@ void cpc_sketch_delete(void* sketchptr);
 void cpc_sketch_update(void* sketchptr, const void* data, unsigned length);
 void cpc_sketch_merge(void* sketchptr1, const void* sketchptr2);
 double cpc_sketch_get_estimate(const void* sketchptr);
-Datum* cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
+void** cpc_sketch_get_estimate_and_bounds(const void* sketchptr, unsigned num_std_devs);
 void cpc_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
 
-void* cpc_sketch_serialize(const void* sketchptr);
+struct ptr_with_size {
+  void* ptr;
+  unsigned long long size;
+};
+
+struct ptr_with_size cpc_sketch_serialize(const void* sketchptr, unsigned header_size);
 void* cpc_sketch_deserialize(const char* buffer, unsigned length);
 
 void* cpc_union_new(unsigned lg_k);
diff --git a/src/cpc_sketch_pg_functions.c b/src/cpc_sketch_pg_functions.c
index c8b6aa9..98000c5 100644
--- a/src/cpc_sketch_pg_functions.c
+++ b/src/cpc_sketch_pg_functions.c
@@ -131,7 +131,7 @@ Datum pg_cpc_sketch_get_estimate_and_bounds(PG_FUNCTION_ARGS) {
   sketchptr = cpc_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
   num_std_devs = PG_GETARG_INT32(1);
   if (num_std_devs == 0) num_std_devs = 1; // default
-  est_and_bounds = cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
+  est_and_bounds = (Datum*) cpc_sketch_get_estimate_and_bounds(sketchptr, num_std_devs);
   cpc_sketch_delete(sketchptr);
 
   // construct output array
@@ -190,7 +190,7 @@ Datum pg_cpc_sketch_union_agg(PG_FUNCTION_ARGS) {
 
 Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
 
   MemoryContext oldcontext;
   MemoryContext aggcontext;
@@ -203,12 +203,13 @@ Datum pg_cpc_sketch_from_internal(PG_FUNCTION_ARGS) {
   oldcontext = MemoryContextSwitchTo(aggcontext);
 
   sketchptr = PG_GETARG_POINTER(0);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
 
   MemoryContextSwitchTo(oldcontext);
 
-  PG_RETURN_BYTEA_P(bytes_out);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
 
 Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
@@ -237,7 +238,7 @@ Datum pg_cpc_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
 Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
   void* unionptr;
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
 
   MemoryContext oldcontext;
   MemoryContext aggcontext;
@@ -251,13 +252,14 @@ Datum pg_cpc_union_get_result(PG_FUNCTION_ARGS) {
 
   unionptr = PG_GETARG_POINTER(0);
   sketchptr = cpc_union_get_result(unionptr);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
   cpc_union_delete(unionptr);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
 
   MemoryContextSwitchTo(oldcontext);
 
-  PG_RETURN_BYTEA_P(bytes_out);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
 
 Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
@@ -267,7 +269,7 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
   void* sketchptr2;
   void* unionptr;
   void* sketchptr;
-  bytea* bytes_out;
+  struct ptr_with_size bytes_out;
   int lg_k;
 
   lg_k = PG_GETARG_INT32(2);
@@ -286,7 +288,8 @@ Datum pg_cpc_sketch_union(PG_FUNCTION_ARGS) {
   }
   sketchptr = cpc_union_get_result(unionptr);
   cpc_union_delete(unionptr);
-  bytes_out = cpc_sketch_serialize(sketchptr);
+  bytes_out = cpc_sketch_serialize(sketchptr, VARHDRSZ);
   cpc_sketch_delete(sketchptr);
-  PG_RETURN_BYTEA_P(bytes_out);
+  SET_VARSIZE(bytes_out.ptr, bytes_out.size);
+  PG_RETURN_BYTEA_P(bytes_out.ptr);
 }
diff --git a/src/global_hooks.c b/src/global_hooks.c
index 7f8d529..f1fdfdd 100644
--- a/src/global_hooks.c
+++ b/src/global_hooks.c
@@ -22,6 +22,9 @@
 
 #include "cpc_sketch_c_adapter.h"
 
+void _PG_init(void);
+void _PG_fini(void);
+
 void _PG_init() {
   cpc_init();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org