You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/07/21 22:08:52 UTC

[beam] branch master updated: [prism] Fix top for unfused execution. Move to register. (#27585)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f6d3ba33b [prism] Fix top for unfused execution. Move to register. (#27585)
f0f6d3ba33b is described below

commit f0f6d3ba33b4df91085d85111ca10e76c9f95acc
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Jul 21 15:08:43 2023 -0700

    [prism] Fix top for unfused execution. Move to register. (#27585)
    
    * [prism] Fix top for unfused execution. Move to register.
    
    * make encoding idempotent
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/transforms/top/top.go       |  15 +--
 sdks/go/pkg/beam/transforms/top/top.shims.go | 185 ---------------------------
 sdks/go/pkg/beam/transforms/top/top_test.go  |  54 ++++----
 3 files changed, 33 insertions(+), 221 deletions(-)

diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go
index f93786cd229..f82dc920aa8 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -29,14 +29,11 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 )
 
-//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
-//go:generate starcgen --package=top
-//go:generate go fmt
-
 func init() {
-	beam.RegisterDoFn(reflect.TypeOf((*combineFn)(nil)))
+	register.Combiner3[accum, beam.T, []beam.T]((*combineFn)(nil))
 }
 
 var (
@@ -157,10 +154,13 @@ func accumEnc() func(accum) ([]byte, error) {
 		panic(err)
 	}
 	return func(a accum) ([]byte, error) {
-		if a.enc == nil {
-			return nil, errors.Errorf("top.accum: element encoder unspecified")
+		if len(a.list) > 0 && a.enc == nil {
+			return nil, errors.Errorf("top.accum: element encoder unspecified with non-zero elements: %v data available", len(a.data))
 		}
 		var values [][]byte
+		if len(a.list) == 0 && len(a.data) > 0 {
+			values = a.data
+		}
 		for _, value := range a.list {
 			var buf bytes.Buffer
 			if err := a.enc.Encode(value, &buf); err != nil {
@@ -168,7 +168,6 @@ func accumEnc() func(accum) ([]byte, error) {
 			}
 			values = append(values, buf.Bytes())
 		}
-		a.list = nil
 
 		var buf bytes.Buffer
 		if err := coder.WriteSimpleRowHeader(1, &buf); err != nil {
diff --git a/sdks/go/pkg/beam/transforms/top/top.shims.go b/sdks/go/pkg/beam/transforms/top/top.shims.go
deleted file mode 100644
index 687046dfc86..00000000000
--- a/sdks/go/pkg/beam/transforms/top/top.shims.go
+++ /dev/null
@@ -1,185 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Code generated by starcgen. DO NOT EDIT.
-// File: top.shims.go
-
-package top
-
-import (
-	"reflect"
-
-	// Library imports
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
-)
-
-func init() {
-	runtime.RegisterType(reflect.TypeOf((*accum)(nil)).Elem())
-	schema.RegisterType(reflect.TypeOf((*accum)(nil)).Elem())
-	runtime.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
-	schema.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
-	reflectx.RegisterStructWrapper(reflect.TypeOf((*combineFn)(nil)).Elem(), wrapMakerCombineFn)
-	reflectx.RegisterFunc(reflect.TypeOf((*func(accum, accum) accum)(nil)).Elem(), funcMakerAccumAccumГAccum)
-	reflectx.RegisterFunc(reflect.TypeOf((*func(accum, typex.T) accum)(nil)).Elem(), funcMakerAccumTypex۰TГAccum)
-	reflectx.RegisterFunc(reflect.TypeOf((*func(accum) []typex.T)(nil)).Elem(), funcMakerAccumГSliceOfTypex۰T)
-	reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), funcMakerГ)
-	reflectx.RegisterFunc(reflect.TypeOf((*func() accum)(nil)).Elem(), funcMakerГAccum)
-}
-
-func wrapMakerCombineFn(fn any) map[string]reflectx.Func {
-	dfn := fn.(*combineFn)
-	return map[string]reflectx.Func{
-		"AddInput":          reflectx.MakeFunc(func(a0 accum, a1 typex.T) accum { return dfn.AddInput(a0, a1) }),
-		"CreateAccumulator": reflectx.MakeFunc(func() accum { return dfn.CreateAccumulator() }),
-		"ExtractOutput":     reflectx.MakeFunc(func(a0 accum) []typex.T { return dfn.ExtractOutput(a0) }),
-		"MergeAccumulators": reflectx.MakeFunc(func(a0 accum, a1 accum) accum { return dfn.MergeAccumulators(a0, a1) }),
-		"Setup":             reflectx.MakeFunc(func() { dfn.Setup() }),
-	}
-}
-
-type callerAccumAccumГAccum struct {
-	fn func(accum, accum) accum
-}
-
-func funcMakerAccumAccumГAccum(fn any) reflectx.Func {
-	f := fn.(func(accum, accum) accum)
-	return &callerAccumAccumГAccum{fn: f}
-}
-
-func (c *callerAccumAccumГAccum) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumAccumГAccum) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumAccumГAccum) Call(args []any) []any {
-	out0 := c.fn(args[0].(accum), args[1].(accum))
-	return []any{out0}
-}
-
-func (c *callerAccumAccumГAccum) Call2x1(arg0, arg1 any) any {
-	return c.fn(arg0.(accum), arg1.(accum))
-}
-
-type callerAccumTypex۰TГAccum struct {
-	fn func(accum, typex.T) accum
-}
-
-func funcMakerAccumTypex۰TГAccum(fn any) reflectx.Func {
-	f := fn.(func(accum, typex.T) accum)
-	return &callerAccumTypex۰TГAccum{fn: f}
-}
-
-func (c *callerAccumTypex۰TГAccum) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumTypex۰TГAccum) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumTypex۰TГAccum) Call(args []any) []any {
-	out0 := c.fn(args[0].(accum), args[1].(typex.T))
-	return []any{out0}
-}
-
-func (c *callerAccumTypex۰TГAccum) Call2x1(arg0, arg1 any) any {
-	return c.fn(arg0.(accum), arg1.(typex.T))
-}
-
-type callerAccumГSliceOfTypex۰T struct {
-	fn func(accum) []typex.T
-}
-
-func funcMakerAccumГSliceOfTypex۰T(fn any) reflectx.Func {
-	f := fn.(func(accum) []typex.T)
-	return &callerAccumГSliceOfTypex۰T{fn: f}
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Call(args []any) []any {
-	out0 := c.fn(args[0].(accum))
-	return []any{out0}
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Call1x1(arg0 any) any {
-	return c.fn(arg0.(accum))
-}
-
-type callerГ struct {
-	fn func()
-}
-
-func funcMakerГ(fn any) reflectx.Func {
-	f := fn.(func())
-	return &callerГ{fn: f}
-}
-
-func (c *callerГ) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГ) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГ) Call(args []any) []any {
-	c.fn()
-	return []any{}
-}
-
-func (c *callerГ) Call0x0() {
-	c.fn()
-}
-
-type callerГAccum struct {
-	fn func() accum
-}
-
-func funcMakerГAccum(fn any) reflectx.Func {
-	f := fn.(func() accum)
-	return &callerГAccum{fn: f}
-}
-
-func (c *callerГAccum) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГAccum) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГAccum) Call(args []any) []any {
-	out0 := c.fn()
-	return []any{out0}
-}
-
-func (c *callerГAccum) Call0x1() any {
-	return c.fn()
-}
-
-// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go
index bf641e6ec37..39d774a6630 100644
--- a/sdks/go/pkg/beam/transforms/top/top_test.go
+++ b/sdks/go/pkg/beam/transforms/top/top_test.go
@@ -22,17 +22,33 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
 )
 
+func TestMain(m *testing.M) {
+	ptest.Main(m)
+}
+
+func init() {
+	register.Function2x2(addKeyFn)
+	register.Function2x1(lessInt)
+	register.Function2x1(shorterString)
+}
+
+func lessInt(a, b int) bool {
+	return a < b
+}
+
+func shorterString(a, b string) bool {
+	return len(a) < len(b)
+}
+
 // TestCombineFn3String verifies that the accumulator correctly
 // maintains the top 3 longest strings.
 func TestCombineFn3String(t *testing.T) {
-	less := func(a, b string) bool {
-		return len(a) < len(b)
-	}
-	fn := newCombineFn(less, 3, reflectx.String, false)
+	fn := newCombineFn(shorterString, 3, reflectx.String, false)
 
 	tests := []struct {
 		Elms     []string
@@ -57,10 +73,7 @@ func TestCombineFn3String(t *testing.T) {
 // TestCombineFn3RevString verifies that the accumulator correctly
 // maintains the top 3 shortest strings.
 func TestCombineFn3RevString(t *testing.T) {
-	less := func(a, b string) bool {
-		return len(a) < len(b)
-	}
-	fn := newCombineFn(less, 3, reflectx.String, true)
+	fn := newCombineFn(shorterString, 3, reflectx.String, true)
 
 	tests := []struct {
 		Elms     []string
@@ -86,10 +99,7 @@ func TestCombineFn3RevString(t *testing.T) {
 // extractOutput still works on the marshalled accumulators it receives after
 // merging.
 func TestCombineFnMerge(t *testing.T) {
-	less := func(a, b string) bool {
-		return len(a) < len(b)
-	}
-	fn := newCombineFn(less, 3, reflectx.String, false)
+	fn := newCombineFn(shorterString, 3, reflectx.String, false)
 	tests := []struct {
 		Elms     [][]string
 		Expected []string
@@ -170,12 +180,9 @@ func output(fn *combineFn, a accum) []string {
 // TestLargest checks that the Largest transform outputs the correct elements
 // for a given PCollection of ints and a comparator function.
 func TestLargest(t *testing.T) {
-	less := func(a, b int) bool {
-		return a < b
-	}
 	p, s := beam.NewPipelineWithRoot()
 	col := beam.Create(s, 1, 11, 7, 5, 10)
-	topTwo := Largest(s, col, 2, less)
+	topTwo := Largest(s, col, 2, lessInt)
 	passert.Equals(s, topTwo, []int{11, 10})
 	if err := ptest.Run(p); err != nil {
 		t.Errorf("pipeline failed but should have succeeded, got %v", err)
@@ -185,12 +192,9 @@ func TestLargest(t *testing.T) {
 // TestSmallest checks that the Smallest transform outputs the correct elements
 // for a given PCollection of ints and a comparator function.
 func TestSmallest(t *testing.T) {
-	less := func(a, b int) bool {
-		return a < b
-	}
 	p, s := beam.NewPipelineWithRoot()
 	col := beam.Create(s, 1, 11, 7, 5, 10)
-	botTwo := Smallest(s, col, 2, less)
+	botTwo := Smallest(s, col, 2, lessInt)
 	passert.Equals(s, botTwo, []int{1, 5})
 	if err := ptest.Run(p); err != nil {
 		t.Errorf("pipeline failed but should have succeeded, got %v", err)
@@ -209,9 +213,6 @@ func addKeyFn(elm beam.T, newKey int) (int, beam.T) {
 // TestLargestPerKey ensures that the LargestPerKey transform outputs the proper
 // collection for a PCollection of type <int, int>.
 func TestLargestPerKey(t *testing.T) {
-	less := func(a, b int) bool {
-		return a < b
-	}
 	p, s := beam.NewPipelineWithRoot()
 	colZero := beam.Create(s, 1, 11, 7, 5, 10)
 	keyedZero := addKey(s, colZero, 0)
@@ -220,7 +221,7 @@ func TestLargestPerKey(t *testing.T) {
 	keyedOne := addKey(s, colOne, 1)
 
 	col := beam.Flatten(s, keyedZero, keyedOne)
-	top := LargestPerKey(s, col, 2, less)
+	top := LargestPerKey(s, col, 2, lessInt)
 	out := beam.DropKey(s, top)
 	passert.Equals(s, out, []int{11, 10}, []int{12, 11})
 	if err := ptest.Run(p); err != nil {
@@ -231,9 +232,6 @@ func TestLargestPerKey(t *testing.T) {
 // TestSmallestPerKey ensures that the SmallestPerKey transform outputs the proper
 // collection for a PCollection of type <int, int>.
 func TestSmallestPerKey(t *testing.T) {
-	less := func(a, b int) bool {
-		return a < b
-	}
 	p, s := beam.NewPipelineWithRoot()
 	colZero := beam.Create(s, 1, 11, 7, 5, 10)
 	keyedZero := addKey(s, colZero, 0)
@@ -242,7 +240,7 @@ func TestSmallestPerKey(t *testing.T) {
 	keyedOne := addKey(s, colOne, 1)
 
 	col := beam.Flatten(s, keyedZero, keyedOne)
-	bot := SmallestPerKey(s, col, 2, less)
+	bot := SmallestPerKey(s, col, 2, lessInt)
 	out := beam.DropKey(s, bot)
 	passert.Equals(s, out, []int{1, 5}, []int{2, 6})
 	if err := ptest.Run(p); err != nil {