You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/14 22:49:12 UTC

[beam] branch master updated: [BEAM-14470] Use Generic Registrations in loadtests. (#17673)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5064cc247ba [BEAM-14470] Use Generic Registrations in loadtests. (#17673)
5064cc247ba is described below

commit 5064cc247ba3ec2697cd7493b14cef8567d614f6
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Sat May 14 15:49:04 2022 -0700

    [BEAM-14470] Use Generic Registrations in loadtests. (#17673)
---
 .../jenkins/job_LoadTests_SideInput_Go.groovy      |  8 ++---
 .../perftests_metrics/SideInput_Load_Tests.json    |  2 +-
 sdks/go/pkg/beam/core/graph/fn.go                  |  4 ++-
 sdks/go/pkg/beam/io/synthetic/source.go            |  6 ++--
 sdks/go/pkg/beam/io/synthetic/step.go              |  7 ++--
 sdks/go/test/load/cogbk/cogbk.go                   |  8 +++--
 sdks/go/test/load/combine/combine.go               |  8 +++++
 sdks/go/test/load/group_by_key/group_by_key.go     | 38 +++++++++++++++-------
 sdks/go/test/load/pardo/pardo.go                   | 10 ++++--
 sdks/go/test/load/sideinput/sideinput.go           | 32 ++++++++++++------
 sdks/go/test/load/util.go                          | 21 ++++++++----
 11 files changed, 99 insertions(+), 45 deletions(-)

diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
index 225bbc79998..55809ac7a10 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy
@@ -29,7 +29,7 @@ String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC'))
 def batchScenarios = {
   [
     [
-      title          : 'SideInput Go Load test: 400mb-1kb-10workers-1window-first-iterable',
+      title          : 'SideInput Go Load test: 10gb-1kb-10workers-1window-first-iterable',
       test           : 'sideinput',
       runner         : CommonTestProperties.Runner.DATAFLOW,
       pipelineOptions: [
@@ -41,7 +41,7 @@ def batchScenarios = {
         influx_namespace     : 'dataflow',
         influx_measurement   : 'go_batch_sideinput_3',
         input_options        : '\'{' +
-        '"num_records": 400000,' +
+        '"num_records": 10000000,' +
         '"key_size": 100,' +
         '"value_size": 900}\'',
         access_percentage: 1,
@@ -52,7 +52,7 @@ def batchScenarios = {
       ]
     ],
     [
-      title          : 'SideInput Go Load test: 400mb-1kb-10workers-1window-iterable',
+      title          : 'SideInput Go Load test: 10gb-1kb-10workers-1window-iterable',
       test           : 'sideinput',
       runner         : CommonTestProperties.Runner.DATAFLOW,
       pipelineOptions: [
@@ -64,7 +64,7 @@ def batchScenarios = {
         influx_namespace     : 'dataflow',
         influx_measurement   : 'go_batch_sideinput_4',
         input_options        : '\'{' +
-        '"num_records": 400000,' +
+        '"num_records": 10000000,' +
         '"key_size": 100,' +
         '"value_size": 900}\'',
         num_workers          : 10,
diff --git a/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json b/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json
index 62616951c80..cee6bb70573 100644
--- a/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json
+++ b/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json
@@ -21,7 +21,7 @@
   "links": [],
   "panels": [
     {
-      "content": "The following options should be used by default:\n* key size: 100B\n* value size: 900B\n* number of workers: 10\n* size of the window (if fixed windows are used): 1 second\n\n[Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job [...]
+      "content": "Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)",
       "datasource": null,
       "gridPos": {
         "h": 8,
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 931458d922d..0dc19e17666 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -122,7 +122,6 @@ func NewFn(fn interface{}) (*Fn, error) {
 				}
 				methods[name] = f
 			}
-			return &Fn{Recv: fn, methods: methods, annotations: annotations}, nil
 		}
 		// TODO(lostluck): Consider moving this into the reflectx package.
 		for i := 0; i < val.Type().NumMethod(); i++ {
@@ -133,6 +132,9 @@ func NewFn(fn interface{}) (*Fn, error) {
 			if m.Name == "String" {
 				continue // skip: harmless
 			}
+			if _, ok := methods[m.Name]; ok {
+				continue // skip : already wrapped
+			}
 
 			// CAVEAT(herohde) 5/22/2017: The type val.Type.Method.Type is not
 			// the same as val.Method.Type: the former has the explicit receiver.
diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go
index 822c416f696..d80a89c931c 100644
--- a/sdks/go/pkg/beam/io/synthetic/source.go
+++ b/sdks/go/pkg/beam/io/synthetic/source.go
@@ -27,17 +27,17 @@ import (
 	"encoding/json"
 	"fmt"
 	"math/rand"
-	"reflect"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem())
-	beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem())
+	register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func([]byte, []byte), error]((*sourceFn)(nil))
+	register.Emitter2[[]byte, []byte]()
 }
 
 // Source creates a synthetic source transform that emits randomly
diff --git a/sdks/go/pkg/beam/io/synthetic/step.go b/sdks/go/pkg/beam/io/synthetic/step.go
index 3691fec9b4f..d800f5a054d 100644
--- a/sdks/go/pkg/beam/io/synthetic/step.go
+++ b/sdks/go/pkg/beam/io/synthetic/step.go
@@ -18,18 +18,19 @@ package synthetic
 import (
 	"fmt"
 	"math/rand"
-	"reflect"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem())
-	beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem())
+	register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*stepFn)(nil))
+	register.DoFn4x0[*sdf.LockRTracker, []byte, []byte, func([]byte, []byte)]((*sdfStepFn)(nil))
+	register.Emitter2[[]byte, []byte]()
 }
 
 // Step creates a synthetic step transform that receives KV<[]byte, []byte>
diff --git a/sdks/go/test/load/cogbk/cogbk.go b/sdks/go/test/load/cogbk/cogbk.go
index 77b196f1620..eefd0bddff1 100644
--- a/sdks/go/test/load/cogbk/cogbk.go
+++ b/sdks/go/test/load/cogbk/cogbk.go
@@ -18,11 +18,11 @@ package main
 import (
 	"context"
 	"flag"
-	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 	"github.com/apache/beam/sdks/v2/go/test/load"
 )
@@ -43,7 +43,9 @@ var (
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem())
+	register.DoFn4x0[[]byte, func(*[]byte) bool, func(*[]byte) bool, func([]byte, []byte)]((*ungroupAndReiterateFn)(nil))
+	register.Emitter2[[]byte, []byte]()
+	register.Iter1[[]byte]()
 }
 
 // ungroupAndReiterateFn reiterates given number of times over CoGBK's output.
@@ -51,6 +53,8 @@ type ungroupAndReiterateFn struct {
 	Iterations int
 }
 
+// TODO use re-iterators once supported.
+
 func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values func(*[]byte) bool, emit func([]byte, []byte)) {
 	var value []byte
 	for i := 0; i < fn.Iterations; i++ {
diff --git a/sdks/go/test/load/combine/combine.go b/sdks/go/test/load/combine/combine.go
index 32d46a1d94a..9f00bbf7990 100644
--- a/sdks/go/test/load/combine/combine.go
+++ b/sdks/go/test/load/combine/combine.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 	"github.com/apache/beam/sdks/v2/go/test/load"
@@ -52,6 +53,12 @@ func parseSyntheticConfig() synthetic.SourceConfig {
 	}
 }
 
+func init() {
+	register.Function2x1(compareLess)
+	register.Function3x0(getElement)
+	register.Emitter2[[]byte, []byte]()
+}
+
 func compareLess(key []byte, value []byte) bool {
 	return bytes.Compare(key, value) < 0
 }
@@ -73,6 +80,7 @@ func main() {
 		pcoll := top.LargestPerKey(s, src, *topCount, compareLess)
 		pcoll = beam.ParDo(s, getElement, pcoll)
 		pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
+		_ = pcoll
 	}
 
 	presult, err := beamx.RunWithMetrics(ctx, p)
diff --git a/sdks/go/test/load/group_by_key/group_by_key.go b/sdks/go/test/load/group_by_key/group_by_key.go
index 78871c96d6f..645afabedef 100644
--- a/sdks/go/test/load/group_by_key/group_by_key.go
+++ b/sdks/go/test/load/group_by_key/group_by_key.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 	"github.com/apache/beam/sdks/v2/go/test/load"
 )
@@ -52,6 +53,30 @@ func parseSyntheticConfig() synthetic.SourceConfig {
 	}
 }
 
+func init() {
+	register.DoFn2x2[[]byte, func(*[]byte) bool, []byte, []byte]((*ungroupAndReiterateFn)(nil))
+	register.Iter1[[]byte]()
+}
+
+// ungroupAndReiterateFn reiterates given number of times over GBK's output.
+type ungroupAndReiterateFn struct {
+	Iterations int
+}
+
+// TODO use re-iterators once supported.
+
+func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
+	var value []byte
+	for i := 0; i < fn.Iterations; i++ {
+		for values(&value) {
+			if i == fn.Iterations-1 {
+				return key, value
+			}
+		}
+	}
+	return key, []byte{0}
+}
+
 func main() {
 	flag.Parse()
 	beam.Init()
@@ -63,18 +88,9 @@ func main() {
 	src = beam.ParDo(s, &load.RuntimeMonitor{}, src)
 	for i := 0; i < *fanout; i++ {
 		pcoll := beam.GroupByKey(s, src)
-		pcoll = beam.ParDo(s, func(key []byte, values func(*[]byte) bool) ([]byte, []byte) {
-			for i := 0; i < *iterations; i++ {
-				var value []byte
-				for values(&value) {
-					if i == *iterations-1 {
-						return key, value
-					}
-				}
-			}
-			return key, []byte{0}
-		}, pcoll)
+		pcoll = beam.ParDo(s, &ungroupAndReiterateFn{*iterations}, pcoll)
 		pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll)
+		_ = pcoll
 	}
 
 	presult, err := beamx.RunWithMetrics(ctx, p)
diff --git a/sdks/go/test/load/pardo/pardo.go b/sdks/go/test/load/pardo/pardo.go
index 8114ac8a906..eceae82bc5d 100644
--- a/sdks/go/test/load/pardo/pardo.go
+++ b/sdks/go/test/load/pardo/pardo.go
@@ -19,11 +19,11 @@ import (
 	"context"
 	"flag"
 	"fmt"
-	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 	"github.com/apache/beam/sdks/v2/go/test/load"
 )
@@ -48,7 +48,8 @@ var (
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*counterOperationFn)(nil)).Elem())
+	register.DoFn4x0[context.Context, []byte, []byte, func([]byte, []byte)]((*counterOperationFn)(nil))
+	register.Emitter2[[]byte, []byte]()
 }
 
 type counterOperationFn struct {
@@ -57,7 +58,10 @@ type counterOperationFn struct {
 }
 
 func newCounterOperationFn(operations, numCounters int) *counterOperationFn {
-	return &counterOperationFn{operations, numCounters, nil}
+	return &counterOperationFn{
+		Operations:  operations,
+		NumCounters: numCounters,
+	}
 }
 
 func (fn *counterOperationFn) Setup() {
diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go
index 6f7cb6f2d41..c57ed9f0b23 100644
--- a/sdks/go/test/load/sideinput/sideinput.go
+++ b/sdks/go/test/load/sideinput/sideinput.go
@@ -18,17 +18,20 @@ package main
 import (
 	"context"
 	"flag"
-	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 	"github.com/apache/beam/sdks/v2/go/test/load"
 )
 
 func init() {
-	beam.RegisterDoFn(reflect.TypeOf((*doFn)(nil)))
+	register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*iterSideInputFn)(nil))
+	register.Emitter2[[]byte, []byte]()
+	register.Iter2[[]byte, []byte]()
+	register.Function2x0(impToKV)
 }
 
 var (
@@ -51,11 +54,17 @@ func parseSyntheticConfig() synthetic.SourceConfig {
 	}
 }
 
-type doFn struct {
+// impToKV just turns an impulse signal into a KV instead of
+// adding a single value input version of RuntimeMonitor
+func impToKV(imp []byte, emit func([]byte, []byte)) {
+	emit(imp, imp)
+}
+
+type iterSideInputFn struct {
 	ElementsToAccess int64
 }
 
-func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
+func (fn *iterSideInputFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) {
 	var key, value []byte
 	var i int64
 	for values(&key, &value) {
@@ -74,18 +83,21 @@ func main() {
 	p, s := beam.NewPipelineWithRoot()
 
 	syntheticConfig := parseSyntheticConfig()
-	elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100)
+	elementsToAccess := syntheticConfig.NumElements * int64(float64(*accessPercentage)/float64(100))
 
 	src := synthetic.SourceSingle(s, syntheticConfig)
-	src = beam.ParDo(s, &load.RuntimeMonitor{}, src)
 
-	src = beam.ParDo(
+	imp := beam.Impulse(s)
+	impKV := beam.ParDo(s, impToKV, imp)
+	monitored := beam.ParDo(s, &load.RuntimeMonitor{}, impKV)
+
+	useSide := beam.ParDo(
 		s,
-		&doFn{ElementsToAccess: elementsToAccess},
-		beam.Impulse(s),
+		&iterSideInputFn{ElementsToAccess: elementsToAccess},
+		monitored,
 		beam.SideInput{Input: src})
 
-	beam.ParDo(s, &load.RuntimeMonitor{}, src)
+	beam.ParDo(s, &load.RuntimeMonitor{}, useSide)
 
 	presult, err := beamx.RunWithMetrics(ctx, p)
 	if err != nil {
diff --git a/sdks/go/test/load/util.go b/sdks/go/test/load/util.go
index 96436d0d678..4cbfda8e0ba 100644
--- a/sdks/go/test/load/util.go
+++ b/sdks/go/test/load/util.go
@@ -24,12 +24,12 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"reflect"
 	"strings"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
 
 const (
@@ -60,7 +60,8 @@ var (
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem())
+	register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*RuntimeMonitor)(nil))
+	register.Emitter2[[]byte, []byte]()
 }
 
 // RuntimeMonitor is a DoFn to record processing time in the pipeline.
@@ -132,10 +133,16 @@ func newLoadTestResult(value float64) loadTestResult {
 // PublishMetrics calculates the runtime and sends the result to InfluxDB database.
 func PublishMetrics(results metrics.QueryResults) {
 	options := newInfluxDBOptions()
+	ress := toLoadTestResults(results)
+	for _, res := range ress {
+		log.Printf("%s %v", res.metric, time.Duration(float64(time.Second)*res.value))
+	}
+	if len(ress) == 0 {
+		log.Print("No metrics returned.")
+		return
+	}
 	if options.validate() {
-		if res := toLoadTestResults(results); len(res) > 0 {
-			publishMetricstoInfluxDB(options, toLoadTestResults(results))
-		}
+		publishMetricstoInfluxDB(options, ress)
 	} else {
 		log.Print("Missing InfluxDB options. Metrics will not be published to InfluxDB")
 	}
@@ -212,8 +219,8 @@ func publishMetricstoInfluxDB(options *influxDBOptions, results []loadTestResult
 	if resp.StatusCode != 204 {
 		jsonData := make(map[string]string)
 		json.Unmarshal(body, &jsonData)
-		log.Print(fmt.Errorf("Failed to publish metrics to InfluxDB. Received status code %v "+
-			"with an error message: %v", resp.StatusCode, jsonData["error"]))
+		log.Printf("Failed to publish metrics to InfluxDB. Received status code %v "+
+			"with an error message: %v", resp.StatusCode, jsonData["error"])
 	}
 }