You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/05/02 23:05:00 UTC

[jira] [Work logged] (BEAM-14347) [Go SDK] Allow users to optimize DoFns with a single generic registration function

     [ https://issues.apache.org/jira/browse/BEAM-14347?focusedWorklogId=765192&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765192 ]

ASF GitHub Bot logged work on BEAM-14347:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 23:04
            Start Date: 02/May/22 23:04
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r863219458


##########
.github/workflows/go_tests.yml:
##########
@@ -39,7 +39,7 @@ jobs:
           fetch-depth: 2
       - uses: actions/setup-go@v2
         with:
-          go-version: '1.16'
+          go-version: '1.18'

Review Comment:
   I'm guessing there's no easy/good way to have a constants/ENV file for github actions, that we can share with everything so we can update this in a single place?



##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,130 @@ func upto(i int) []int {
 	}
 	return ret
 }
+
+func add(i int, j int) int {
+	return i + j
+}
+
+func mult(i int, j int) int {
+	return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+	dict := make(map[string]interface{}, len(values)/2)
+	if len(values)%2 != 0 {
+		panic("Invalid dictionary call")
+	}
+	for i := 0; i < len(values); i += 2 {
+		dict[values[i].(string)] = values[i+1]
+	}
+
+	return dict
+}
+
+func list(values ...string) []string {
+	return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+	seenElements := false
+	typing := ""
+	if in > 0 {
+		typing += fmt.Sprintf("[I%v", 0)
+		for i := 1; i < in; i++ {
+			typing += fmt.Sprintf(", I%v", i)
+		}
+		seenElements = true
+	}
+	if out > 0 {
+		i := 0
+		if !seenElements {
+			typing += fmt.Sprintf("[R%v", 0)
+			i++
+		}
+		for i < out {
+			typing += fmt.Sprintf(", R%v", i)
+			i++
+		}
+		seenElements = true
+	}
+
+	if seenElements {
+		if includeType {
+			typing += " any"
+		}
+		typing += "]"
+	}
+
+	return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string {
+	numIn := numInInterface.(int)
+	processElementIn := processElementInInterface.(int)
+	ordered_known_parameter_options := []string{"context.Context", "typex.PaneInfo", "[]typex.Window", "typex.EventTime", "typex.BundleFinalization"}
+	// Because of how Bundle lifecycle functions are invoked, all known parameters must preced unknown options and be in order.

Review Comment:
   ```suggestion
   	// Because of how Bundle lifecycle functions are invoked, all known parameters must preceed unknown options and be in order.
   ```



##########
sdks/go/pkg/beam/registration/registration_test.go:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/specialize
+//go:generate specialize --package=beam --input=registration.tmpl --x=data,universals --imports=typex
+//go:generate go fmt

Review Comment:
   I'd prefer if this weren't in a test file, but I think we can move it to a `doc.go` file, and that way, we can have a proper package comment explaining how it works.
   
   Also `--package=registration`, not `beam`.
   
   We could also have a full file testable example on the side to demonstrate the packages use.
   
   https://go.dev/blog/examples#:~:text=Larger%20examples



##########
sdks/go/pkg/beam/registration/registration_test.go:
##########
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/specialize
+//go:generate specialize --package=beam --input=registration.tmpl --x=data,universals --imports=typex
+//go:generate go fmt
+
+package beam

Review Comment:
   ```suggestion
   package registration
   ```



##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,130 @@ func upto(i int) []int {
 	}
 	return ret
 }
+
+func add(i int, j int) int {
+	return i + j
+}
+
+func mult(i int, j int) int {
+	return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+	dict := make(map[string]interface{}, len(values)/2)
+	if len(values)%2 != 0 {
+		panic("Invalid dictionary call")
+	}
+	for i := 0; i < len(values); i += 2 {
+		dict[values[i].(string)] = values[i+1]
+	}
+
+	return dict
+}
+
+func list(values ...string) []string {
+	return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+	seenElements := false
+	typing := ""
+	if in > 0 {
+		typing += fmt.Sprintf("[I%v", 0)
+		for i := 1; i < in; i++ {
+			typing += fmt.Sprintf(", I%v", i)
+		}
+		seenElements = true
+	}
+	if out > 0 {
+		i := 0
+		if !seenElements {
+			typing += fmt.Sprintf("[R%v", 0)
+			i++
+		}
+		for i < out {
+			typing += fmt.Sprintf(", R%v", i)
+			i++
+		}
+		seenElements = true
+	}
+
+	if seenElements {
+		if includeType {
+			typing += " any"
+		}
+		typing += "]"
+	}
+
+	return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string {

Review Comment:
   I would recommend adding a test to this package/binary for this. It's allowed to add tests to `package main`,



##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {
+    {{$lowerName}}In := -1
+	{{$lowerName}}Out := -1
+	{{$lowerName}}Method := reflect.ValueOf(doFn).MethodByName("{{$upperName}}")
+	if {{$lowerName}}Method.IsValid() {
+		{{$lowerName}}In = {{$lowerName}}Method.Type().NumIn()
+		{{$lowerName}}Out = {{$lowerName}}Method.Type().NumOut()
+	    switch {
+{{range $funcIn := upto $startFinishBundleMaxIn}}
+    case {{$lowerName}}In == {{$funcIn}}:
+            switch { {{range $funcOut := upto 2}}{{$possibleCombos := (possibleBundleLifecycleParameterCombos $funcIn $numParams)}}{{if $possibleCombos}}
+            case {{$lowerName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first = false}}                {{else}} else {{end}}if _, ok := doFn.({{$lowerName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); ok {
+                    reflectx.RegisterFunc(reflect.TypeOf((*func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), func(fn interface{}) reflectx.Func { return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}{fn: fn.(func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})} })
+                    {{$funcRegister := (makeStructRegisterEntry $lowerName $upperName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = (makeStructRegisterEntry $lowerName $upperName $funcCombo (list "error"))}}{{end}}
+                    return func(fn interface{}) reflectx.Func { return {{$funcRegister}} }

Review Comment:
   This block here that wraps to a caller generation, feels like an opportunity to reduce lines. If we add one further layer of generic helper (0x0 though 7x1) for the 'caller' generation and wrapping we could save 6 lines per invocation.  
   
   Defining them should be 10 lines each including the blank line spacer, with 14 helpers shared among Start and Finish bundles, which happens around ~2600 times, that's new 140 lines, but still around 16k lines saved or so.
   
   



##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {
+    {{$lowerName}}In := -1
+	{{$lowerName}}Out := -1
+	{{$lowerName}}Method := reflect.ValueOf(doFn).MethodByName("{{$upperName}}")
+	if {{$lowerName}}Method.IsValid() {
+		{{$lowerName}}In = {{$lowerName}}Method.Type().NumIn()
+		{{$lowerName}}Out = {{$lowerName}}Method.Type().NumOut()
+	    switch {
+{{range $funcIn := upto $startFinishBundleMaxIn}}
+    case {{$lowerName}}In == {{$funcIn}}:
+            switch { {{range $funcOut := upto 2}}{{$possibleCombos := (possibleBundleLifecycleParameterCombos $funcIn $numParams)}}{{if $possibleCombos}}
+            case {{$lowerName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first = false}}                {{else}} else {{end}}if _, ok := doFn.({{$lowerName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); ok {
+                    reflectx.RegisterFunc(reflect.TypeOf((*func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), func(fn interface{}) reflectx.Func { return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}{fn: fn.(func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})} })
+                    {{$funcRegister := (makeStructRegisterEntry $lowerName $upperName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = (makeStructRegisterEntry $lowerName $upperName $funcCombo (list "error"))}}{{end}}
+                    return func(fn interface{}) reflectx.Func { return {{$funcRegister}} }
+                } {{end}}{{end}}{{end}}
+            default:
+                panic("Invalid signature for {{$upperName}}")
+            }
+{{end}}
+        default:
+            panic("Invalid signature for {{$upperName}}")
+        }
+    }
+    return nil
+}
+{{end}}{{end}}{{define "BuildWrapper_SetupTeardown"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "setup")}}{{$lowerName = "setup"}}{{$upperName = "Setup"}}{{end}}{{if (eq .func "teardown")}}{{$lowerName = "teardown"}}{{$upperName = "Teardown"}}{{end}}
+func build{{$upperName}}Wrapper(doFn interface{}) func(interface{}) reflectx.Func {
+    if _, ok := doFn.({{$lowerName}}0x0); ok {
+        {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), {{$lowerName}}Caller)
+
+        return func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.({{$lowerName}}0x0).{{$upperName}}()
+            })
+        }
+    } else if _, ok := doFn.({{$lowerName}}1x0); ok {
+        {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), {{$lowerName}}Caller)
+
+        return func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.({{$lowerName}}1x0).{{$upperName}}(a0)
+            })
+        }
+    } else if _, ok := doFn.({{$lowerName}}0x1); ok {
+        {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), {{$lowerName}}Caller)
+
+        return func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.({{$lowerName}}0x1).{{$upperName}}()
+            })
+        }
+    } else if _, ok := doFn.({{$lowerName}}1x1); ok {
+        {{$lowerName}}Caller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) error)(nil)).Elem(), {{$lowerName}}Caller)
+
+        return func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.({{$lowerName}}1x1).{{$upperName}}(a0)
+            })
+        }
+    }
+
+    return nil
+}
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam

Review Comment:
   
   ```suggestion
   package registration
   ```



##########
sdks/go/pkg/beam/registration/registration.tmpl:
##########
@@ -0,0 +1,252 @@
+{{define "BuildWrapper_StartFinishBundle"}}
+{{$lowerName := "unknown"}}{{$upperName := "unknown"}}{{if (eq .func "startBundle")}}{{$lowerName = "startBundle"}}{{$upperName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$lowerName = "finishBundle"}}{{$upperName = "FinishBundle"}}{{end}}{{$startFinishBundleMaxIn := .startFinishBundleMaxIn}}{{$processElementMaxIn := .processElementMaxIn}}
+{{range $numParams := upto $processElementMaxIn}}
+func build{{$upperName}}Wrapper{{$numParams}}{{(genericTypingRepresentation $numParams 0 true)}}(doFn interface{}) func(interface{}) reflectx.Func {

Review Comment:
   The vast bulk of the generated file is handling startbundle and finish bundle. I'm curious if we can somehow modify what we have as `buildStartBundleWrapper9`, passing in the parameter cap, and populate the unused types with `int` or `struct{}`?
   
   (The raw file is viewable at least, which is nice https://raw.githubusercontent.com/apache/beam/9490267e52bb8f50a829f8c683db0beebc0b089a/sdks/go/pkg/beam/registration/registration.go)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765192)
    Time Spent: 3h 10m  (was: 3h)

> [Go SDK] Allow users to optimize DoFns with a single generic registration function
> ----------------------------------------------------------------------------------
>
>                 Key: BEAM-14347
>                 URL: https://issues.apache.org/jira/browse/BEAM-14347
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Danny McCormick
>            Assignee: Danny McCormick
>            Priority: P2
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Right now, to optimize DoFn execution, users have to use the code generator. This updates to allow them to use generics instead.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)