You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/06/06 01:45:17 UTC

[incubator-datasketches-postgresql] 03/04: theta draft

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch theta
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-postgresql.git

commit f33856dc13a3aebe433bf6d7c658de0092cc6556
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Jun 3 16:25:16 2019 -0700

    theta draft
---
 Makefile                          |  11 +-
 sql/datasketches_theta_sketch.sql |  93 +++++++++++++++++
 src/theta_sketch_c_adapter.cpp    | 145 +++++++++++++++++++++++++
 src/theta_sketch_c_adapter.h      |  36 +++++++
 src/theta_sketch_pg_functions.c   | 215 ++++++++++++++++++++++++++++++++++++++
 5 files changed, 495 insertions(+), 5 deletions(-)

diff --git a/Makefile b/Makefile
index 6f08583..bcac339 100644
--- a/Makefile
+++ b/Makefile
@@ -3,17 +3,18 @@ MODULE_big = datasketches
 
 OBJS = src/base64.o src/common.o \
   src/kll_float_sketch_pg_functions.o src/kll_float_sketch_c_adapter.o \
-  src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o
+  src/cpc_sketch_pg_functions.o src/cpc_sketch_c_adapter.o \
+  src/theta_sketch_pg_functions.o src/theta_sketch_c_adapter.o
 
-# assume a copy or link sketches-core-cpp in the current dir
-CORE = sketches-core-cpp
+# assume a copy or link datasketches-cpp in the current dir
+CORE = datasketches-cpp
 CPC = $(CORE)/cpc/src
 OBJS += $(CPC)/cpc_sketch.o $(CPC)/fm85.o $(CPC)/fm85Compression.o $(CPC)/fm85Confidence.o $(CPC)/fm85Merging.o $(CPC)/fm85Util.o $(CPC)/iconEstimator.o $(CPC)/u32Table.o
 
-DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql
+DATA = sql/datasketches_cpc_sketch.sql sql/datasketches_kll_float_sketch.sql sql/datasketches_theta_sketch.sql
 
 CXX = g++-8
-PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include
+PG_CPPFLAGS = -I/usr/local/include -I$(CORE)/kll/include -I$(CORE)/common/include -I$(CORE)/cpc/include -I$(CORE)/theta/include
 SHLIB_LINK = -lstdc++ -L/usr/local/lib
 
 PG_CONFIG = pg_config
diff --git a/sql/datasketches_theta_sketch.sql b/sql/datasketches_theta_sketch.sql
new file mode 100644
index 0000000..f5200ff
--- /dev/null
+++ b/sql/datasketches_theta_sketch.sql
@@ -0,0 +1,93 @@
+-- Copyright 2019, Verizon Media.
+-- Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+
+CREATE TYPE theta_sketch;
+
+CREATE OR REPLACE FUNCTION theta_sketch_in(cstring) RETURNS theta_sketch
+     AS '$libdir/datasketches', 'pg_sketch_in'
+     LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_out(theta_sketch) RETURNS cstring
+     AS '$libdir/datasketches', 'pg_sketch_out'
+     LANGUAGE C STRICT IMMUTABLE;
+
+CREATE TYPE theta_sketch (
+    INPUT = theta_sketch_in,
+    OUTPUT = theta_sketch_out,
+    STORAGE = EXTERNAL
+);
+
+CREATE CAST (bytea as theta_sketch) WITHOUT FUNCTION AS ASSIGNMENT;
+CREATE CAST (theta_sketch as bytea) WITHOUT FUNCTION AS ASSIGNMENT;
+
+CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement) RETURNS internal
+    AS '$libdir/datasketches', 'pg_theta_sketch_add_item'
+    LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_add_item(internal, anyelement, int) RETURNS internal
+    AS '$libdir/datasketches', 'pg_theta_sketch_add_item'
+    LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_get_estimate(theta_sketch) RETURNS double precision
+    AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate'
+    LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_from_internal(internal) RETURNS theta_sketch
+    AS '$libdir/datasketches', 'pg_theta_sketch_from_internal'
+    LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_get_estimate_from_internal(internal) RETURNS double precision
+    AS '$libdir/datasketches', 'pg_theta_sketch_get_estimate_from_internal'
+    LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_to_string(theta_sketch) RETURNS TEXT
+    AS '$libdir/datasketches', 'pg_theta_sketch_to_string'
+    LANGUAGE C STRICT IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch) RETURNS internal
+    AS '$libdir/datasketches', 'pg_theta_sketch_union'
+    LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_sketch_union(internal, theta_sketch, int) RETURNS internal
+    AS '$libdir/datasketches', 'pg_theta_sketch_union'
+    LANGUAGE C IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION theta_union_get_result(internal) RETURNS theta_sketch
+    AS '$libdir/datasketches', 'pg_theta_union_get_result'
+    LANGUAGE C STRICT IMMUTABLE;
+
+CREATE AGGREGATE theta_sketch_distinct(anyelement) (
+    sfunc = theta_sketch_add_item,
+    stype = internal,
+    finalfunc = theta_sketch_get_estimate_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_distinct(anyelement, int) (
+    sfunc = theta_sketch_add_item,
+    stype = internal,
+    finalfunc = theta_sketch_get_estimate_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_build(anyelement) (
+    sfunc = theta_sketch_add_item,
+    stype = internal,
+    finalfunc = theta_sketch_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_build(anyelement, int) (
+    sfunc = theta_sketch_add_item,
+    stype = internal,
+    finalfunc = theta_sketch_from_internal
+);
+
+CREATE AGGREGATE theta_sketch_union(theta_sketch) (
+    sfunc = theta_sketch_union,
+    stype = internal,
+    finalfunc = theta_union_get_result
+);
+
+CREATE AGGREGATE theta_sketch_union(theta_sketch, int) (
+    sfunc = theta_sketch_union,
+    stype = internal,
+    finalfunc = theta_union_get_result
+);
diff --git a/src/theta_sketch_c_adapter.cpp b/src/theta_sketch_c_adapter.cpp
new file mode 100644
index 0000000..968204c
--- /dev/null
+++ b/src/theta_sketch_c_adapter.cpp
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#include "theta_sketch_c_adapter.h"
+#include "allocator.h"
+
+extern "C" {
+#include <postgres.h>
+}
+
+#include <sstream>
+
+#include <theta_sketch.hpp>
+#include <theta_union.hpp>
+
+typedef datasketches::theta_sketch_alloc<palloc_allocator<void>> theta_sketch_pg;
+typedef datasketches::update_theta_sketch_alloc<palloc_allocator<void>> update_theta_sketch_pg;
+typedef datasketches::compact_theta_sketch_alloc<palloc_allocator<void>> compact_theta_sketch_pg;
+typedef datasketches::theta_union_alloc<palloc_allocator<void>> theta_union_pg;
+
+void* theta_sketch_new_default() {
+  try {
+    return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().build());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_sketch_new(unsigned lg_k) {
+  try {
+    return new (palloc(sizeof(update_theta_sketch_pg))) update_theta_sketch_pg(update_theta_sketch_pg::builder().set_lg_k(lg_k).build());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void theta_sketch_delete(void* sketchptr) {
+  try {
+    static_cast<theta_sketch_pg*>(sketchptr)->~theta_sketch_pg();
+    pfree(sketchptr);
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void theta_sketch_update(void* sketchptr, const void* data, unsigned length) {
+  try {
+    static_cast<update_theta_sketch_pg*>(sketchptr)->update(data, length);
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_sketch_compact(void* sketchptr) {
+  try {
+    auto newptr = new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<update_theta_sketch_pg*>(sketchptr)->compact());
+    static_cast<update_theta_sketch_pg*>(sketchptr)->~update_theta_sketch_pg();
+    pfree(sketchptr);
+    return newptr;
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+double theta_sketch_get_estimate(const void* sketchptr) {
+  try {
+    return static_cast<const theta_sketch_pg*>(sketchptr)->get_estimate();
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length) {
+  try {
+    std::stringstream s;
+    static_cast<const theta_sketch_pg*>(sketchptr)->to_stream(s);
+    snprintf(buffer, length, s.str().c_str());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_sketch_serialize(const void* sketchptr) {
+  try {
+    auto data = static_cast<const theta_sketch_pg*>(sketchptr)->serialize(VARHDRSZ);
+    bytea* buffer = (bytea*) data.first.release();
+    const size_t length = data.second;
+    SET_VARSIZE(buffer, length);
+    return buffer;
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_sketch_deserialize(const char* buffer, unsigned length) {
+  try {
+    auto ptr = theta_sketch_pg::deserialize(buffer, length);
+    return ptr.release();
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_union_new_default() {
+  try {
+    return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().build());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_union_new(unsigned lg_k) {
+  try {
+    return new (palloc(sizeof(theta_union_pg))) theta_union_pg(theta_union_pg::builder().set_lg_k(lg_k).build());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void theta_union_delete(void* unionptr) {
+  try {
+    static_cast<theta_union_pg*>(unionptr)->~theta_union_pg();
+    pfree(unionptr);
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void theta_union_update(void* unionptr, const void* sketchptr) {
+  try {
+    static_cast<theta_union_pg*>(unionptr)->update(*static_cast<const theta_sketch_pg*>(sketchptr));
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
+
+void* theta_union_get_result(void* unionptr) {
+  try {
+    return new (palloc(sizeof(compact_theta_sketch_pg))) compact_theta_sketch_pg(static_cast<theta_union_pg*>(unionptr)->get_result());
+  } catch (std::exception& e) {
+    elog(ERROR, e.what());
+  }
+}
diff --git a/src/theta_sketch_c_adapter.h b/src/theta_sketch_c_adapter.h
new file mode 100644
index 0000000..57c3df5
--- /dev/null
+++ b/src/theta_sketch_c_adapter.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#ifndef THETA_SKETCH_C_ADAPTER_H
+#define THETA_SKETCH_C_ADAPTER_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void* theta_sketch_new_default();
+void* theta_sketch_new(unsigned lg_k);
+void theta_sketch_delete(void* sketchptr);
+
+void theta_sketch_update(void* sketchptr, const void* data, unsigned length);
+void* theta_sketch_compact(void* sketchptr);
+void theta_sketch_union(void* sketchptr1, const void* sketchptr2);
+double theta_sketch_get_estimate(const void* sketchptr);
+void theta_sketch_to_string(const void* sketchptr, char* buffer, unsigned length);
+
+void* theta_sketch_serialize(const void* sketchptr);
+void* theta_sketch_deserialize(const char* buffer, unsigned length);
+
+void* theta_union_new_default();
+void* theta_union_new(unsigned lg_k);
+void theta_union_delete(void* unionptr);
+void theta_union_update(void* unionptr, const void* sketchptr);
+void* theta_union_get_result(void* unionptr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/theta_sketch_pg_functions.c b/src/theta_sketch_pg_functions.c
new file mode 100644
index 0000000..2c19264
--- /dev/null
+++ b/src/theta_sketch_pg_functions.c
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2019, Verizon Media.
+ * Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
+ */
+
+#include <postgres.h>
+#include <fmgr.h>
+#include <utils/lsyscache.h>
+#include <utils/builtins.h>
+
+#include "theta_sketch_c_adapter.h"
+#include "base64.h"
+
+/* PG_FUNCTION_INFO_V1 macro to pass functions to postgres */
+PG_FUNCTION_INFO_V1(pg_theta_sketch_add_item);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_to_string);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_merge);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_from_internal);
+PG_FUNCTION_INFO_V1(pg_theta_sketch_get_estimate_from_internal);
+PG_FUNCTION_INFO_V1(pg_theta_union_get_result);
+
+/* function declarations */
+Datum pg_theta_sketch_recv(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_send(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS);
+Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS);
+Datum pg_theta_union_get_result(PG_FUNCTION_ARGS);
+
+Datum pg_theta_sketch_add_item(PG_FUNCTION_ARGS) {
+  void* sketchptr;
+  int lg_k;
+
+  // anyelement
+  Oid   element_type;
+  Datum element;
+  int16 typlen;
+  bool  typbyval;
+  char  typalign;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+    PG_RETURN_NULL();
+  } else if (PG_ARGISNULL(1)) {
+    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+  }
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "theta_sketch_add_item called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  if (PG_ARGISNULL(0)) {
+    lg_k = PG_GETARG_INT32(2);
+    sketchptr = lg_k ? theta_sketch_new(lg_k) : theta_sketch_new_default();
+  } else {
+    sketchptr = PG_GETARG_POINTER(0);
+  }
+
+  element_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
+  element = PG_GETARG_DATUM(1);
+  get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);
+  if (typlen == -1) {
+    // varlena
+    theta_sketch_update(sketchptr, VARDATA_ANY(element), VARSIZE_ANY_EXHDR(element));
+  } else if (typbyval) {
+    // fixed-length passed by value
+    theta_sketch_update(sketchptr, &element, typlen);
+  } else {
+    // fixed-length passed by reference
+    theta_sketch_update(sketchptr, (void*)element, typlen);
+  }
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_POINTER(sketchptr);
+}
+
+Datum pg_theta_sketch_get_estimate(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  double estimate;
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  estimate = theta_sketch_get_estimate(sketchptr);
+  theta_sketch_delete(sketchptr);
+  PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_theta_sketch_to_string(PG_FUNCTION_ARGS) {
+  const bytea* bytes_in;
+  void* sketchptr;
+  char str[1024];
+  bytes_in = PG_GETARG_BYTEA_P(0);
+  sketchptr = theta_sketch_deserialize(VARDATA(bytes_in), VARSIZE(bytes_in) - VARHDRSZ);
+  theta_sketch_to_string(sketchptr, str, 1024);
+  theta_sketch_delete(sketchptr);
+  PG_RETURN_TEXT_P(cstring_to_text(str));
+}
+
+Datum pg_theta_sketch_union(PG_FUNCTION_ARGS) {
+  void* unionptr;
+  bytea* sketch_bytes;
+  void* sketchptr;
+  int lg_k;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0) && PG_ARGISNULL(1)) {
+    PG_RETURN_NULL();
+  } else if (PG_ARGISNULL(1)) {
+    PG_RETURN_POINTER(PG_GETARG_POINTER(0)); // no update value. return unmodified state
+  }
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "theta_sketch_merge called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  if (PG_ARGISNULL(0)) {
+    lg_k = PG_GETARG_INT32(2);
+    unionptr = lg_k ? theta_union_new(lg_k) : theta_union_new_default();
+  } else {
+    unionptr = PG_GETARG_POINTER(0);
+  }
+
+  sketch_bytes = PG_GETARG_BYTEA_P(1);
+  sketchptr = theta_sketch_deserialize(VARDATA(sketch_bytes), VARSIZE(sketch_bytes) - VARHDRSZ);
+  theta_union_update(unionptr, sketchptr);
+  theta_sketch_delete(sketchptr);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_POINTER(unionptr);
+}
+
+Datum pg_theta_sketch_from_internal(PG_FUNCTION_ARGS) {
+  void* sketchptr;
+  bytea* bytes_out;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "theta_sketch_from_internal called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  sketchptr = PG_GETARG_POINTER(0);
+  sketchptr = theta_sketch_compact(sketchptr);
+  bytes_out = theta_sketch_serialize(sketchptr);
+  theta_sketch_delete(sketchptr);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_BYTEA_P(bytes_out);
+}
+
+Datum pg_theta_sketch_get_estimate_from_internal(PG_FUNCTION_ARGS) {
+  void* sketchptr;
+  double estimate;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "theta_sketch_from_internal called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  sketchptr = PG_GETARG_POINTER(0);
+  estimate = theta_sketch_get_estimate(sketchptr);
+  theta_sketch_delete(sketchptr);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_FLOAT8(estimate);
+}
+
+Datum pg_theta_union_get_result(PG_FUNCTION_ARGS) {
+  void* unionptr;
+  void* sketchptr;
+  bytea* bytes_out;
+
+  MemoryContext oldcontext;
+  MemoryContext aggcontext;
+
+  if (PG_ARGISNULL(0)) PG_RETURN_NULL();
+
+  if (!AggCheckCallContext(fcinfo, &aggcontext)) {
+    elog(ERROR, "theta_union_get_result called in non-aggregate context");
+  }
+  oldcontext = MemoryContextSwitchTo(aggcontext);
+
+  unionptr = PG_GETARG_POINTER(0);
+  sketchptr = theta_union_get_result(unionptr);
+  bytes_out = theta_sketch_serialize(sketchptr);
+  theta_sketch_delete(sketchptr);
+  theta_union_delete(unionptr);
+
+  MemoryContextSwitchTo(oldcontext);
+
+  PG_RETURN_BYTEA_P(bytes_out);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org