You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/05/02 23:05:00 UTC
[jira] [Work logged] (BEAM-14347) [Go SDK] Allow users to optimize DoFns with a single generic registration function
[ https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=765192&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765192 ]
ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/May/22 23:04
Start Date: 02/May/22 23:04
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r863219458
##########
.github/workflows/go_tests.yml:
##########
@@ -39,7 +39,7 @@ jobs:
fetch-depth: 2
- uses: actions/setup-go@v2
with:
- go-version: '1.16'
+ go-version: '1.18'
Review Comment:
I'm guessing there's no easy/good way to have a constants/ENV file for github actions, that we can share with everything so we can update this in a single place?
##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,130 @@ func upto(i int) []int {
}
return ret
}
+
+func add(i int, j int) int {
+ return i + j
+}
+
+func mult(i int, j int) int {
+ return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+ dict := make(map[string]interface{}, len(values)/2)
+ if len(values)%2 != 0 {
+ panic("Invalid dictionary call")
+ }
+ for i := 0; i < len(values); i += 2 {
+ dict[values[i].(string)] = values[i+1]
+ }
+
+ return dict
+}
+
+func list(values ...string) []string {
+ return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+ seenElements := false
+ typing := ""
+ if in > 0 {
+ typing += fmt.Sprintf("[I%v", 0)
+ for i := 1; i < in; i++ {
+ typing += fmt.Sprintf(", I%v", i)
+ }
+ seenElements = true
+ }
+ if out > 0 {
+ i := 0
+ if !seenElements {
+ typing += fmt.Sprintf("[R%v", 0)
+ i++
+ }
+ for i < out {
+ typing += fmt.Sprintf(", R%v", i)
+ i++
+ }
+ seenElements = true
+ }
+
+ if seenElements {
+ if includeType {
+ typing += " any"
+ }
+ typing += "]"
+ }
+
+ return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string {
+ numIn := numInInterface.(int)
+ processElementIn := processElementInInterface.(int)
+ ordered_known_parameter_options := []string{"context.Context", "typex.PaneInfo", "[]typex.Window", "typex.EventTime", "typex.BundleFinalization"}
+ // Because of how Bundle lifecycle functions are invoked, all known parameters must preced unknown options and be in order.
Review Comment:
```suggestion
// Because of how Bundle lifecycle functions are invoked, all known parameters must preceed unknown options and be in order.
```
##########
sdks/go/pkg/beam/registration/registration_test.go:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/specialize
+//go:generate specialize --package=beam --input=registration.tmpl --x=data,universals --imports=typex
+//go:generate go fmt
Review Comment:
I'd prefer if this weren't in a test file, but I think we can move it to a `doc.go` file, and that way, we can have a proper package comment explaining how it works.
Also `--package=registration`, not `beam`.
We could also have a full file testable example on the side to demonstrate the packages use.
https://go.dev/blog/examples#:~:text=Larger%20examples
##########
sdks/go/pkg/beam/registration/registration_test.go:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/specialize
+//go:generate specialize --package=beam --input=registration.tmpl --x=data,universals --imports=typex
+//go:generate go fmt
+
+package beam
Review Comment:
```suggestion
package registration
```
##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,130 @@ func upto(i int) []int {
}
return ret
}
+
+func add(i int, j int) int {
+ return i + j
+}
+
+func mult(i int, j int) int {
+ return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+ dict := make(map[string]interface{}, len(values)/2)
+ if len(values)%2 != 0 {
+ panic("Invalid dictionary call")
+ }
+ for i := 0; i < len(values); i += 2 {
+ dict[values[i].(string)] = values[i+1]
+ }
+
+ return dict
+}
+
+func list(values ...string) []string {
+ return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+ seenElements := false
+ typing := ""
+ if in > 0 {
+ typing += fmt.Sprintf("[I%v", 0)
+ for i := 1; i < in; i++ {
+ typing += fmt.Sprintf(", I%v", i)
+ }
+ seenElements = true
+ }
+ if out > 0 {
+ i := 0
+ if !seenElements {
+ typing += fmt.Sprintf("[R%v", 0)
+ i++
+ }
+ for i < out {
+ typing += fmt.Sprintf(", R%v", i)
+ i++
+ }
+ seenElements = true
+ }
+
+ if seenElements {
+ if includeType {
+ typing += " any"
+ }
+ typing += "]"
+ }
+
+ return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string {
Review Comment:
I would recommend adding a test to this package/binary for this. It's allowed to add tests to `package main`,
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {
+ {{$lowerName}}In := -1
+ {{$lowerName}}Out := -1
+ {{$lowerName}}Method := reflect.ValueOf(doFn).MethodByName("{{$upperName}}")
+ if {{$lowerName}}Method.IsValid() {
+ {{$lowerName}}In = {{$lowerName}}Method.Type().NumIn()
+ {{$lowerName}}Out = {{$lowerName}}Method.Type().NumOut()
+ switch {
+{{range $funcIn := upto $startFinishBundleMaxIn}}
+ case {{$lowerName}}In == {{$funcIn}}:
+ switch { {{range $funcOut := upto 2}}{{$possibleCombos := (possibleBundleLifecycleParameterCombos $funcIn $numParams)}}{{if $possibleCombos}}
+ case {{$lowerName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first = false}} {{else}} else {{end}}if _, ok := doFn.({{$lowerName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); ok {
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), func(fn interface{}) reflectx.Func { return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}{fn: fn.(func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})} })
+ {{$funcRegister := (makeStructRegisterEntry $lowerName $upperName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = (makeStructRegisterEntry $lowerName $upperName $funcCombo (list "error"))}}{{end}}
+ return func(fn interface{}) reflectx.Func { return {{$funcRegister}} }
Review Comment:
This block here that wraps to a caller generation, feels like an opportunity to reduce lines. If we add one further layer of generic helper (0x0 though 7x1) for the 'caller' generation and wrapping we could save 6 lines per invocation.
Defining them should be 10 lines each including the blank line spacer, with 14 helpers shared among Start and Finish bundles, which happens around ~2600 times, that's new 140 lines, but still around 16k lines saved or so.
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {
+ {{$lowerName}}In := -1
+ {{$lowerName}}Out := -1
+ {{$lowerName}}Method := reflect.ValueOf(doFn).MethodByName("{{$upperName}}")
+ if {{$lowerName}}Method.IsValid() {
+ {{$lowerName}}In = {{$lowerName}}Method.Type().NumIn()
+ {{$lowerName}}Out = {{$lowerName}}Method.Type().NumOut()
+ switch {
+{{range $funcIn := upto $startFinishBundleMaxIn}}
+ case {{$lowerName}}In == {{$funcIn}}:
+ switch { {{range $funcOut := upto 2}}{{$possibleCombos := (possibleBundleLifecycleParameterCombos $funcIn $numParams)}}{{if $possibleCombos}}
+ case {{$lowerName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first = false}} {{else}} else {{end}}if _, ok := doFn.({{$lowerName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); ok {
+ reflectx.RegisterFunc(reflect.TypeOf((*func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), func(fn interface{}) reflectx.Func { return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}{fn: fn.(func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})} })
+ {{$funcRegister := (makeStructRegisterEntry $lowerName $upperName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = (makeStructRegisterEntry $lowerName $upperName $funcCombo (list "error"))}}{{end}}
+ return func(fn interface{}) reflectx.Func { return {{$funcRegister}} }
+ } {{end}}{{end}}{{end}}
+ default:
+ panic("Invalid signature for {{$upperName}}")
+ }
+{{end}}
+ default:
+ panic("Invalid signature for {{$upperName}}")
+ }
+ }
+ return nil
+}
+{{end}}{{end}}{{define "BuildWrapper_SetupTeardown"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "setup")}}{{$lowerName = "setup"}}{{$upperName = "Setup"}}{{end}}{{if (eq .func "teardown")}}{{$lowerName = "teardown"}}{{$upperName = "Teardown"}}{{end}}
+func build{{$upperName}}Wrapper(doFn interface{}) func(interface{}) reflectx.Func {
+ if _, ok := doFn.({{$lowerName}}0x0); ok {
+ {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func())
+ return &caller0x0{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), {{$lowerName}}Caller)
+
+ return func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() {
+ fn.({{$lowerName}}0x0).{{$upperName}}()
+ })
+ }
+ } else if _, ok := doFn.({{$lowerName}}1x0); ok {
+ {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context))
+ return &caller1x0[context.Context]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), {{$lowerName}}Caller)
+
+ return func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) {
+ fn.({{$lowerName}}1x0).{{$upperName}}(a0)
+ })
+ }
+ } else if _, ok := doFn.({{$lowerName}}0x1); ok {
+ {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func() error)
+ return &caller0x1[error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), {{$lowerName}}Caller)
+
+ return func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func() error {
+ return fn.({{$lowerName}}0x1).{{$upperName}}()
+ })
+ }
+ } else if _, ok := doFn.({{$lowerName}}1x1); ok {
+ {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+ f := fn.(func(context.Context) error)
+ return &caller1x1[context.Context, error]{fn: f}
+ }
+ reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) error)(nil)).Elem(), {{$lowerName}}Caller)
+
+ return func(fn interface{}) reflectx.Func {
+ return reflectx.MakeFunc(func(a0 context.Context) error {
+ return fn.({{$lowerName}}1x1).{{$upperName}}(a0)
+ })
+ }
+ }
+
+ return nil
+}
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
Review Comment:
```suggestion
package registration
```
##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {
Review Comment:
The vast bulk of the generated file is handling startbundle and finish bundle. I'm curious if we can somehow modify what we have as `buildStartBundleWrapper9`, passing in the parameter cap, and populate the unused types with `int` or `struct{}`?
(The raw file is viewable at least, which is nice https://raw.githubusercontent.com/apache/beam/9490267e52bb8f50a829f8c683db0beebc0b089a/sdks/go/pkg/beam/registration/registration.go)
Issue Time Tracking
-------------------
Worklog Id: (was: 765192)
Time Spent: 3h 10m (was: 3h)
> [Go SDK] Allow users to optimize DoFns with a single generic registration function
> ----------------------------------------------------------------------------------
>
> Key: BEAM-14347
> URL: https://issues.apache.org/jira/browse/BEAM-14347
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Danny McCormick
> Assignee: Danny McCormick
> Priority: P2
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. This updates to allow them to use generics instead.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)