You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/07/21 22:08:52 UTC
[beam] branch master updated: [prism] Fix top for unfused execution. Move to register. (#27585)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f0f6d3ba33b [prism] Fix top for unfused execution. Move to register. (#27585)
f0f6d3ba33b is described below
commit f0f6d3ba33b4df91085d85111ca10e76c9f95acc
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Jul 21 15:08:43 2023 -0700
[prism] Fix top for unfused execution. Move to register. (#27585)
* [prism] Fix top for unfused execution. Move to register.
* make encoding idempotent
---------
Co-authored-by: lostluck <13...@users.noreply.github.com>
---
sdks/go/pkg/beam/transforms/top/top.go | 15 +--
sdks/go/pkg/beam/transforms/top/top.shims.go | 185 ---------------------------
sdks/go/pkg/beam/transforms/top/top_test.go | 54 ++++----
3 files changed, 33 insertions(+), 221 deletions(-)
diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go
index f93786cd229..f82dc920aa8 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -29,14 +29,11 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)
-//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
-//go:generate starcgen --package=top
-//go:generate go fmt
-
func init() {
- beam.RegisterDoFn(reflect.TypeOf((*combineFn)(nil)))
+ register.Combiner3[accum, beam.T, []beam.T]((*combineFn)(nil))
}
var (
@@ -157,10 +154,13 @@ func accumEnc() func(accum) ([]byte, error) {
panic(err)
}
return func(a accum) ([]byte, error) {
- if a.enc == nil {
- return nil, errors.Errorf("top.accum: element encoder unspecified")
+ if len(a.list) > 0 && a.enc == nil {
+ return nil, errors.Errorf("top.accum: element encoder unspecified with non-zero elements: %v data available", len(a.data))
}
var values [][]byte
+ if len(a.list) == 0 && len(a.data) > 0 {
+ values = a.data
+ }
for _, value := range a.list {
var buf bytes.Buffer
if err := a.enc.Encode(value, &buf); err != nil {
@@ -168,7 +168,6 @@ func accumEnc() func(accum) ([]byte, error) {
}
values = append(values, buf.Bytes())
}
- a.list = nil
var buf bytes.Buffer
if err := coder.WriteSimpleRowHeader(1, &buf); err != nil {
diff --git a/sdks/go/pkg/beam/transforms/top/top.shims.go b/sdks/go/pkg/beam/transforms/top/top.shims.go
deleted file mode 100644
index 687046dfc86..00000000000
--- a/sdks/go/pkg/beam/transforms/top/top.shims.go
+++ /dev/null
@@ -1,185 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Code generated by starcgen. DO NOT EDIT.
-// File: top.shims.go
-
-package top
-
-import (
- "reflect"
-
- // Library imports
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
-)
-
-func init() {
- runtime.RegisterType(reflect.TypeOf((*accum)(nil)).Elem())
- schema.RegisterType(reflect.TypeOf((*accum)(nil)).Elem())
- runtime.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
- schema.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
- reflectx.RegisterStructWrapper(reflect.TypeOf((*combineFn)(nil)).Elem(), wrapMakerCombineFn)
- reflectx.RegisterFunc(reflect.TypeOf((*func(accum, accum) accum)(nil)).Elem(), funcMakerAccumAccumГAccum)
- reflectx.RegisterFunc(reflect.TypeOf((*func(accum, typex.T) accum)(nil)).Elem(), funcMakerAccumTypex۰TГAccum)
- reflectx.RegisterFunc(reflect.TypeOf((*func(accum) []typex.T)(nil)).Elem(), funcMakerAccumГSliceOfTypex۰T)
- reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), funcMakerГ)
- reflectx.RegisterFunc(reflect.TypeOf((*func() accum)(nil)).Elem(), funcMakerГAccum)
-}
-
-func wrapMakerCombineFn(fn any) map[string]reflectx.Func {
- dfn := fn.(*combineFn)
- return map[string]reflectx.Func{
- "AddInput": reflectx.MakeFunc(func(a0 accum, a1 typex.T) accum { return dfn.AddInput(a0, a1) }),
- "CreateAccumulator": reflectx.MakeFunc(func() accum { return dfn.CreateAccumulator() }),
- "ExtractOutput": reflectx.MakeFunc(func(a0 accum) []typex.T { return dfn.ExtractOutput(a0) }),
- "MergeAccumulators": reflectx.MakeFunc(func(a0 accum, a1 accum) accum { return dfn.MergeAccumulators(a0, a1) }),
- "Setup": reflectx.MakeFunc(func() { dfn.Setup() }),
- }
-}
-
-type callerAccumAccumГAccum struct {
- fn func(accum, accum) accum
-}
-
-func funcMakerAccumAccumГAccum(fn any) reflectx.Func {
- f := fn.(func(accum, accum) accum)
- return &callerAccumAccumГAccum{fn: f}
-}
-
-func (c *callerAccumAccumГAccum) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumAccumГAccum) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumAccumГAccum) Call(args []any) []any {
- out0 := c.fn(args[0].(accum), args[1].(accum))
- return []any{out0}
-}
-
-func (c *callerAccumAccumГAccum) Call2x1(arg0, arg1 any) any {
- return c.fn(arg0.(accum), arg1.(accum))
-}
-
-type callerAccumTypex۰TГAccum struct {
- fn func(accum, typex.T) accum
-}
-
-func funcMakerAccumTypex۰TГAccum(fn any) reflectx.Func {
- f := fn.(func(accum, typex.T) accum)
- return &callerAccumTypex۰TГAccum{fn: f}
-}
-
-func (c *callerAccumTypex۰TГAccum) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumTypex۰TГAccum) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumTypex۰TГAccum) Call(args []any) []any {
- out0 := c.fn(args[0].(accum), args[1].(typex.T))
- return []any{out0}
-}
-
-func (c *callerAccumTypex۰TГAccum) Call2x1(arg0, arg1 any) any {
- return c.fn(arg0.(accum), arg1.(typex.T))
-}
-
-type callerAccumГSliceOfTypex۰T struct {
- fn func(accum) []typex.T
-}
-
-func funcMakerAccumГSliceOfTypex۰T(fn any) reflectx.Func {
- f := fn.(func(accum) []typex.T)
- return &callerAccumГSliceOfTypex۰T{fn: f}
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Call(args []any) []any {
- out0 := c.fn(args[0].(accum))
- return []any{out0}
-}
-
-func (c *callerAccumГSliceOfTypex۰T) Call1x1(arg0 any) any {
- return c.fn(arg0.(accum))
-}
-
-type callerГ struct {
- fn func()
-}
-
-func funcMakerГ(fn any) reflectx.Func {
- f := fn.(func())
- return &callerГ{fn: f}
-}
-
-func (c *callerГ) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГ) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГ) Call(args []any) []any {
- c.fn()
- return []any{}
-}
-
-func (c *callerГ) Call0x0() {
- c.fn()
-}
-
-type callerГAccum struct {
- fn func() accum
-}
-
-func funcMakerГAccum(fn any) reflectx.Func {
- f := fn.(func() accum)
- return &callerГAccum{fn: f}
-}
-
-func (c *callerГAccum) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerГAccum) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerГAccum) Call(args []any) []any {
- out0 := c.fn()
- return []any{out0}
-}
-
-func (c *callerГAccum) Call0x1() any {
- return c.fn()
-}
-
-// DO NOT MODIFY: GENERATED CODE
diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go
index bf641e6ec37..39d774a6630 100644
--- a/sdks/go/pkg/beam/transforms/top/top_test.go
+++ b/sdks/go/pkg/beam/transforms/top/top_test.go
@@ -22,17 +22,33 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
+func TestMain(m *testing.M) {
+ ptest.Main(m)
+}
+
+func init() {
+ register.Function2x2(addKeyFn)
+ register.Function2x1(lessInt)
+ register.Function2x1(shorterString)
+}
+
+func lessInt(a, b int) bool {
+ return a < b
+}
+
+func shorterString(a, b string) bool {
+ return len(a) < len(b)
+}
+
// TestCombineFn3String verifies that the accumulator correctly
// maintains the top 3 longest strings.
func TestCombineFn3String(t *testing.T) {
- less := func(a, b string) bool {
- return len(a) < len(b)
- }
- fn := newCombineFn(less, 3, reflectx.String, false)
+ fn := newCombineFn(shorterString, 3, reflectx.String, false)
tests := []struct {
Elms []string
@@ -57,10 +73,7 @@ func TestCombineFn3String(t *testing.T) {
// TestCombineFn3RevString verifies that the accumulator correctly
// maintains the top 3 shortest strings.
func TestCombineFn3RevString(t *testing.T) {
- less := func(a, b string) bool {
- return len(a) < len(b)
- }
- fn := newCombineFn(less, 3, reflectx.String, true)
+ fn := newCombineFn(shorterString, 3, reflectx.String, true)
tests := []struct {
Elms []string
@@ -86,10 +99,7 @@ func TestCombineFn3RevString(t *testing.T) {
// extractOutput still works on the marshalled accumulators it receives after
// merging.
func TestCombineFnMerge(t *testing.T) {
- less := func(a, b string) bool {
- return len(a) < len(b)
- }
- fn := newCombineFn(less, 3, reflectx.String, false)
+ fn := newCombineFn(shorterString, 3, reflectx.String, false)
tests := []struct {
Elms [][]string
Expected []string
@@ -170,12 +180,9 @@ func output(fn *combineFn, a accum) []string {
// TestLargest checks that the Largest transform outputs the correct elements
// for a given PCollection of ints and a comparator function.
func TestLargest(t *testing.T) {
- less := func(a, b int) bool {
- return a < b
- }
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, 1, 11, 7, 5, 10)
- topTwo := Largest(s, col, 2, less)
+ topTwo := Largest(s, col, 2, lessInt)
passert.Equals(s, topTwo, []int{11, 10})
if err := ptest.Run(p); err != nil {
t.Errorf("pipeline failed but should have succeeded, got %v", err)
@@ -185,12 +192,9 @@ func TestLargest(t *testing.T) {
// TestSmallest checks that the Smallest transform outputs the correct elements
// for a given PCollection of ints and a comparator function.
func TestSmallest(t *testing.T) {
- less := func(a, b int) bool {
- return a < b
- }
p, s := beam.NewPipelineWithRoot()
col := beam.Create(s, 1, 11, 7, 5, 10)
- botTwo := Smallest(s, col, 2, less)
+ botTwo := Smallest(s, col, 2, lessInt)
passert.Equals(s, botTwo, []int{1, 5})
if err := ptest.Run(p); err != nil {
t.Errorf("pipeline failed but should have succeeded, got %v", err)
@@ -209,9 +213,6 @@ func addKeyFn(elm beam.T, newKey int) (int, beam.T) {
// TestLargestPerKey ensures that the LargestPerKey transform outputs the proper
// collection for a PCollection of type <int, int>.
func TestLargestPerKey(t *testing.T) {
- less := func(a, b int) bool {
- return a < b
- }
p, s := beam.NewPipelineWithRoot()
colZero := beam.Create(s, 1, 11, 7, 5, 10)
keyedZero := addKey(s, colZero, 0)
@@ -220,7 +221,7 @@ func TestLargestPerKey(t *testing.T) {
keyedOne := addKey(s, colOne, 1)
col := beam.Flatten(s, keyedZero, keyedOne)
- top := LargestPerKey(s, col, 2, less)
+ top := LargestPerKey(s, col, 2, lessInt)
out := beam.DropKey(s, top)
passert.Equals(s, out, []int{11, 10}, []int{12, 11})
if err := ptest.Run(p); err != nil {
@@ -231,9 +232,6 @@ func TestLargestPerKey(t *testing.T) {
// TestSmallestPerKey ensures that the SmallestPerKey transform outputs the proper
// collection for a PCollection of type <int, int>.
func TestSmallestPerKey(t *testing.T) {
- less := func(a, b int) bool {
- return a < b
- }
p, s := beam.NewPipelineWithRoot()
colZero := beam.Create(s, 1, 11, 7, 5, 10)
keyedZero := addKey(s, colZero, 0)
@@ -242,7 +240,7 @@ func TestSmallestPerKey(t *testing.T) {
keyedOne := addKey(s, colOne, 1)
col := beam.Flatten(s, keyedZero, keyedOne)
- bot := SmallestPerKey(s, col, 2, less)
+ bot := SmallestPerKey(s, col, 2, lessInt)
out := beam.DropKey(s, bot)
passert.Equals(s, out, []int{1, 5}, []int{2, 6})
if err := ptest.Run(p); err != nil {