You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by pl...@apache.org on 2024/03/16 17:20:30 UTC

(datasketches-characterization) branch go-characterization updated: Rebase Go implemn from Java Job runners

This is an automated email from the ASF dual-hosted git repository.

placave pushed a commit to branch go-characterization
in repository https://gitbox.apache.org/repos/asf/datasketches-characterization.git


The following commit(s) were added to refs/heads/go-characterization by this push:
     new e7d8724  Rebase Go implemn from Java Job runners
e7d8724 is described below

commit e7d8724b878e79256db1d77b615681c8a2e20db3
Author: Pierre Lacave <pi...@datadoghq.com>
AuthorDate: Sat Mar 16 18:20:20 2024 +0100

    Rebase Go implemn from Java Job runners
---
 go/configs.go                         |  35 +++++
 go/distinct_count_accuracy_profile.go | 257 ++++++++++++++++++++++++----------
 go/hll_sketch_accuracy_profile.go     |  33 +++--
 go/job.go                             |  35 ++---
 go/main.go                            |   2 +-
 5 files changed, 243 insertions(+), 119 deletions(-)

diff --git a/go/configs.go b/go/configs.go
new file mode 100644
index 0000000..b63fa46
--- /dev/null
+++ b/go/configs.go
@@ -0,0 +1,35 @@
+package main
+
+import "github.com/apache/datasketches-go/hll"
+
+type distinctCountJobConfigType struct {
+	lgMinU int // The starting # of uniques that is printed at the end.
+	lgMaxU int // How high the # uniques go
+	UPPO   int // The horizontal x-resolution of trials points
+
+	lgMinT int // prints intermediate results starting w/ this lgMinT
+	lgMaxT int // The max trials
+	TPPO   int // how often intermediate results are printed
+
+	lgQK      int  // size of quantiles sketch
+	interData bool // intermediate data
+
+	runner DistinctCountAccuracyProfileRunner
+}
+
+var (
+	distinctCountJobConfig = distinctCountJobConfigType{
+		lgMinU: 0,
+		lgMaxU: 20,
+		UPPO:   16,
+
+		lgMinT: 8,
+		lgMaxT: 20,
+		TPPO:   1,
+
+		lgQK:      12,
+		interData: true,
+
+		runner: NewHllSketchAccuracyRunner(4 /* lgK */, hll.TgtHllTypeHll8 /* tgtType */),
+	}
+)
diff --git a/go/distinct_count_accuracy_profile.go b/go/distinct_count_accuracy_profile.go
index cddaa54..f399731 100644
--- a/go/distinct_count_accuracy_profile.go
+++ b/go/distinct_count_accuracy_profile.go
@@ -18,112 +18,215 @@ package main
 
 import (
 	"fmt"
+	"github.com/apache/datasketches-go/common"
+	"github.com/apache/datasketches-go/kll"
+	"math"
+	"strings"
 	"time"
 )
 
+const (
+	M4SD = 0.0000316712418331 //minus 4 StdDev
+	M3SD = 0.0013498980316301 //minus 3 StdDev
+	M2SD = 0.0227501319481792 //minus 2 StdDev
+	M1SD = 0.1586552539314570 //minus 1 StdDev
+	MED  = 0.5                //median
+	P1SD = 0.8413447460685430 //plus  1 StdDev
+	P2SD = 0.9772498680518210 //plus  2 StdDev
+	P3SD = 0.9986501019683700 //plus  3 StdDev
+	P4SD = 0.9999683287581670 //plus  4 StdDev
+)
+
+var (
+	GAUSSIANS_4SD = []float64{0.0, M4SD, M3SD, M2SD, M1SD, MED, P1SD, P2SD, P3SD, P4SD, 1.0}
+)
+
 type DistinctCountAccuracyProfileRunner interface {
-	runTrial(stats []accuracyStats, key uint64) uint64
+	runTrial(stats []*accuracyStats, key uint64) uint64
 }
 
 type accuracyStats struct {
-	trueValue   int
+	trueValue   uint64
 	sumEst      float64
 	sumRelErr   float64
 	sumSqRelErr float64
 	count       int
-	//kll_sketch<double> rel_err_distribution;
+	// Make that a sketch of float64
+	rel_err_distribution *kll.ItemsSketch[int64]
+}
+
+func (a *accuracyStats) update(est float64) {
+	a.sumEst += est
+	relativeError := est/float64(a.trueValue) - 1.0
+	a.sumRelErr += relativeError
+	a.sumSqRelErr += relativeError * relativeError
+	a.rel_err_distribution.Update(int64(relativeError))
+	a.count++
 }
 
 type DistinctCountAccuracyProfile struct {
-	runner DistinctCountAccuracyProfileRunner
-	stats  []accuracyStats
+	config    distinctCountJobConfigType
+	runner    DistinctCountAccuracyProfileRunner
+	stats     []*accuracyStats
+	startTime int64
 }
 
-func NewDistinctCountAccuracyProfile(runner DistinctCountAccuracyProfileRunner) *DistinctCountAccuracyProfile {
+func NewDistinctCountAccuracyProfile(config distinctCountJobConfigType) *DistinctCountAccuracyProfile {
 	return &DistinctCountAccuracyProfile{
-		runner,
-		make([]accuracyStats, 0),
+		config:    config,
+		runner:    config.runner,
+		stats:     buildLog2AccuracyStatsArray(config.lgMinU, config.lgMaxU, config.UPPO, config.lgQK),
+		startTime: time.Now().UnixMilli(),
 	}
 }
 
 func (d *DistinctCountAccuracyProfile) run() {
-	const (
-		lgMinTrials       = 4
-		lgMaxTrials       = 16
-		trialsPpo         = 4
-		printIntermediate = true
-
-		minT      = uint64(1) << lgMinTrials
-		maxTrials = 1 << lgMaxTrials
-
-		lgMinCounts = 0
-		lgMaxCounts = 32
-		countsPpo   = 16
-
-		quantilesK = 10000
-	)
-
-	var (
-		numPoints = countPoints(lgMinCounts, lgMaxCounts, countsPpo)
-		p         = uint64(1) << lgMinCounts
-		key       = uint64(0)
-	)
-
-	for i := 0; i < numPoints; i++ {
-		d.stats = append(d.stats, accuracyStats{
-			trueValue: quantilesK,
-			//rel_err_distribution: p,
-		})
-		p = pwr2LawNext(countsPpo, p)
-	}
+	minT := 1 << d.config.lgMinT
+	maxT := 1 << d.config.lgMaxT
+	maxU := 1 << d.config.lgMaxU
 
-	startTime := time.Now().UnixMilli()
+	vIn := uint64(0)
 
-	// this will generate a table of data up to each intermediate number of trials
-	lastTrials := uint64(0)
-	for lastTrials < maxTrials {
-		nextTrials := minT
-		if lastTrials != 0 {
-			nextTrials = pwr2LawNext(trialsPpo, lastTrials)
+	// This will generate a table of data for each intermediate Trials point
+	lastTpt := 0
+	for lastTpt < maxT {
+		nextT := lastTpt
+		if lastTpt == 0 {
+			nextT = minT
+		} else {
+			nextT = int(pwr2SeriesNext(d.config.TPPO, uint64(lastTpt)))
+		}
+		delta := nextT - lastTpt
+		for i := 0; i < delta; i++ {
+			vIn = d.runner.runTrial(d.stats, vIn)
 		}
-		delta := nextTrials - lastTrials
-		for i := uint64(0); i < delta; i++ {
-			key = d.runner.runTrial(d.stats, key)
+		lastTpt = nextT
+		sb := &strings.Builder{}
+		if nextT < maxT {
+			if d.config.interData {
+				fmt.Println(getHeader())
+				process(d.stats, lastTpt, sb)
+				fmt.Println(sb.String())
+			}
+		} else {
+			fmt.Println(getHeader())
+			process(d.stats, lastTpt, sb)
+			fmt.Println(sb.String())
 		}
-		lastTrials = nextTrials
-		if printIntermediate || nextTrials == maxTrials {
-			d.printStats()
+
+		fmt.Printf("Config:             : %+v\n", d.config)
+		fmt.Printf("Cum Trials          : %d\n", lastTpt)
+		fmt.Printf("Cum Updates         : %d\n", vIn)
+		currentTime_mS := time.Now().UnixMilli()
+		cumTime_mS := currentTime_mS - d.startTime
+		fmt.Printf("Cum Time            : %s\n", time.Duration(cumTime_mS*1000*1000))
+		timePerTrial_mS := float64(cumTime_mS) / float64(lastTpt)
+		avgUpdateTime_ns := timePerTrial_mS * 1e6 / float64(maxU)
+		fmt.Printf("Time Per Trial, mSec: %f\n", timePerTrial_mS)
+		fmt.Printf("Avg Update Time, nSec: %f\n", avgUpdateTime_ns)
+		fmt.Printf("Date Time           : %s\n", time.Now().Format(time.RFC3339))
+		timeToComplete_mS := int64(timePerTrial_mS * float64(maxT-lastTpt))
+		fmt.Printf("Est Time to Complete: %s\n", time.Duration(timeToComplete_mS*1000*1000))
+		fmt.Printf("Est Time at Completion: %s\n", time.Now().Add(time.Duration(timeToComplete_mS*1000*1000)).Format(time.RFC3339))
+		fmt.Println("")
+	}
+}
+
+func process(qArr []*accuracyStats, cumTrials int, sb *strings.Builder) {
+	points := len(qArr)
+	sb.Reset()
+	for pt := 0; pt < points; pt++ {
+		q := qArr[pt]
+
+		trueUniques := q.trueValue
+
+		meanEst := q.sumEst / float64(cumTrials)
+		meanRelErr := q.sumRelErr / float64(cumTrials)
+		meanSqErr := q.sumSqRelErr / float64(cumTrials)
+		normMeanSqErr := meanSqErr / (float64(trueUniques) * float64(trueUniques))
+		rmsRelErr := math.Sqrt(normMeanSqErr)
+		//q.rmsre = rmsRelErr
+
+		sb.WriteString(fmt.Sprintf("%d", trueUniques))
+		sb.WriteString("\t")
+
+		sb.WriteString(fmt.Sprintf("%e", meanEst))
+		sb.WriteString("\t")
+
+		sb.WriteString(fmt.Sprintf("%e", meanRelErr))
+		sb.WriteString("\t")
+
+		sb.WriteString(fmt.Sprintf("%e", rmsRelErr))
+		sb.WriteString("\t")
+
+		sb.WriteString(fmt.Sprintf("%d", cumTrials))
+		sb.WriteString("\t")
+
+		quants, _ := q.rel_err_distribution.GetQuantiles(GAUSSIANS_4SD, true)
+		for i := 0; i < len(quants); i++ {
+			sb.WriteString(fmt.Sprintf("%e", float64(quants[i])/(float64(trueUniques))-1.0))
+			sb.WriteString("\t")
 		}
 
-		fmt.Println("Cum Trials             : ", lastTrials)
-		fmt.Println("Cum Updates            : ", key)
-		cumTimeMs := time.Now().UnixMilli() - startTime
-		fmt.Println("Cum Time, ms           : ", cumTimeMs)
-		timePerTrialMs := float64(cumTimeMs) / float64(lastTrials)
-		fmt.Println("Avg Time Per Trial, ms : ", timePerTrialMs)
-		currentTime := time.Now()
-		fmt.Println("Current time           : ", currentTime)
-		timeToCompleteMs := float64(cumTimeMs) / float64(lastTrials) * float64(maxTrials-lastTrials)
-		estCompletionTime := currentTime.Add(time.Duration(timeToCompleteMs) * time.Millisecond)
-		fmt.Println("Est Time of Completion : ", estCompletionTime)
-		fmt.Println()
+		sb.WriteString(fmt.Sprintf("%d", 0))
+		sb.WriteString("\t")
+		sb.WriteString(fmt.Sprintf("%d", 0))
+
+		sb.WriteString("\n")
 	}
 }
 
-func (d *DistinctCountAccuracyProfile) printStats() {
-	for _, stat := range d.stats {
-		fmt.Println(stat.trueValue)
-		fmt.Println(stat.count)
-		fmt.Println(stat.sumEst)
-		fmt.Println(stat.sumRelErr)
-		fmt.Println(stat.sumSqRelErr)
-		// quantiles
-		//const auto quants = stat.get_quantiles(FRACTIONS, FRACT_LEN);
-		//for (size_t i = 0; i < FRACT_LEN; i++) {
-		//	const double quantile = quants[i];
-		//	std::cout << quantile;
-		//	if (i != FRACT_LEN - 1) std::cout << "\t";
-		//}
-		fmt.Println()
+func getHeader() string {
+	sb := &strings.Builder{}
+	sb.WriteString("TrueU")
+	sb.WriteString("\t")
+	sb.WriteString("MeanEst")
+	sb.WriteString("\t")
+	sb.WriteString("MeanRelErr")
+	sb.WriteString("\t")
+	sb.WriteString("RMS_RE")
+	sb.WriteString("\t")
+	sb.WriteString("Trials")
+	sb.WriteString("\t")
+	sb.WriteString("Min")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.0000317)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.00135)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.02275)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.15866)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.5)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.84134)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.97725)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.99865)")
+	sb.WriteString("\t")
+	sb.WriteString("Q(.9999683)")
+	sb.WriteString("\t")
+	sb.WriteString("Max")
+	sb.WriteString("\t")
+	sb.WriteString("Bytes")
+	sb.WriteString("\t")
+	sb.WriteString("ReMerit")
+	return sb.String()
+}
+
+func buildLog2AccuracyStatsArray(lgMin, lgMax, ppo, lgQK int) []*accuracyStats {
+	qLen := countPoints(lgMin, lgMax, ppo)
+	qArr := make([]*accuracyStats, qLen)
+	p := uint64(1) << lgMin
+	for i := 0; i < qLen; i++ {
+		kllSketch, _ := kll.NewKllItemsSketch[int64](uint16(lgQK), 8, common.ArrayOfLongsSerDe{})
+		qArr[i] = &accuracyStats{
+			trueValue:            p,
+			rel_err_distribution: kllSketch,
+		}
+		p = pwr2SeriesNext(ppo, p)
 	}
+	return qArr
 }
diff --git a/go/hll_sketch_accuracy_profile.go b/go/hll_sketch_accuracy_profile.go
index cd7ccfa..8d667c9 100644
--- a/go/hll_sketch_accuracy_profile.go
+++ b/go/hll_sketch_accuracy_profile.go
@@ -17,29 +17,34 @@
 
 package main
 
-import "github.com/apache/datasketches-go/hll"
+import (
+	"github.com/apache/datasketches-go/hll"
+)
 
-type HllSketchAccuracyProfile struct {
+type HllSketchAccuracyRunner struct {
+	sketch hll.HllSketch
 }
 
-func NewHllSketchAccuracyProfile() *HllSketchAccuracyProfile {
-	return &HllSketchAccuracyProfile{}
+func NewHllSketchAccuracyRunner(lgK int, tgtType hll.TgtHllType) *HllSketchAccuracyRunner {
+	sketch, _ := hll.NewHllSketch(lgK, tgtType)
+	return &HllSketchAccuracyRunner{
+		sketch: sketch,
+	}
 }
 
-func (HllSketchAccuracyProfile) runTrial(stats []accuracyStats, key uint64) uint64 {
-	lgK := 12
-
-	s, _ := hll.NewHllSketch(lgK, hll.TgtHllTypeDefault)
-	count := 0
+func (h *HllSketchAccuracyRunner) runTrial(stats []*accuracyStats, key uint64) uint64 {
+	h.sketch.Reset()
 
+	lastUniques := uint64(0)
 	for _, stat := range stats {
-		delta := stat.trueValue - count
-		for i := 0; i < delta; i++ {
-			s.UpdateUInt64(key)
+		delta := stat.trueValue - lastUniques
+		for u := uint64(0); u < delta; u++ {
+			h.sketch.UpdateUInt64(key)
 			key++
 		}
-		count += delta
-		//stat.update(s.get_estimate())
+		lastUniques += delta
+		est, _ := h.sketch.GetEstimate()
+		stat.update(est)
 	}
 
 	return key
diff --git a/go/job.go b/go/job.go
index 7fc3a5b..38f19f4 100644
--- a/go/job.go
+++ b/go/job.go
@@ -23,53 +23,34 @@ type JobProfile interface {
 	run()
 }
 
-// countPoints Counts the actual number of plotting points between lgStart and lgEnd assuming the given PPO.
+// countPoints return the actual number of plotting points between lgStart and lgEnd assuming the given PPO
+// and a logBase of 2.
 // This is not a simple linear function due to points that may be skipped in the low range.
-// param lgStart Log2 of the starting value
-// param lgEnd Log2 of the ending value
-// param ppo the number of logarithmically evenly spaced points per octave.
-// returns the actual number of plotting points between lgStart and lgEnd.
 func countPoints(lgStart, lgEnd, ppo int) int {
 	p := uint64(1) << lgStart
 	end := uint64(1) << lgEnd
 	count := 0
 	for p <= end {
-		p = pwr2LawNext(ppo, p)
+		p = pwr2SeriesNext(ppo, p)
 		count++
 	}
 	return count
 }
 
-// pwr2LawNext Computes the next larger integer point in the power series
-// point = 2^( i / ppo ) given the current point in the series.
-// For illustration, this can be used in a loop as follows:
-//
-//	int maxP = 1024;
-//	int minP = 1;
-//	int ppo = 2;
-//
-//	for (int p = minP; p <= maxP; p = pwr2LawNext(ppo, p)) {
-//	  System.out.print(p + " ");
-//	}
-//	//generates the following series:
-//	//1 2 3 4 6 8 11 16 23 32 45 64 91 128 181 256 362 512 724 1024
-//
-// param ppo Points-Per-Octave, or the number of points per integer powers of 2 in the series.
-// param curPoint the current point of the series. Must be &ge; 1.
-// returns the next point in the power series.
-func pwr2LawNext(ppo int, curPoint uint64) uint64 {
+func pwr2SeriesNext(ppo int, curPoint uint64) uint64 {
 	cur := curPoint
 	if cur < 1 {
 		cur = 1
 	}
-	gi := int64(math.Log2(float64(cur)) * float64(ppo))
+	gi := int(math.Round(math.Log2(float64(cur)) * float64(ppo)))
+
 	var next uint64
 	for {
-		next = uint64(math.Pow(2.0, float64(gi)/float64(ppo)))
+		gi++
+		next = uint64(math.Round(math.Pow(2.0, float64(gi)/float64(ppo))))
 		if next > curPoint {
 			break
 		}
-		gi++
 	}
 	return next
 }
diff --git a/go/main.go b/go/main.go
index 5733394..383b915 100644
--- a/go/main.go
+++ b/go/main.go
@@ -20,7 +20,7 @@ package main
 func main() {
 
 	jobs := map[string]JobProfile{
-		"distinct_count_accuracy_profile": NewDistinctCountAccuracyProfile(NewHllSketchAccuracyProfile()),
+		"distinct_count_accuracy_profile": NewDistinctCountAccuracyProfile(distinctCountJobConfig),
 	}
 
 	for _, job := range jobs {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org