You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/27 05:13:59 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #17429: [BEAM-14347] Allow users to optimize DoFn execution with a single generic registration function

lostluck commented on code in PR #17429:
URL: https://github.com/apache/beam/pull/17429#discussion_r859383238


##########
sdks/go/cmd/specialize/main.go:
##########
@@ -238,3 +246,133 @@ func upto(i int) []int {
 	}
 	return ret
 }
+
+func add(i int, j int) int {
+	return i + j
+}
+
+func mult(i int, j int) int {
+	return i * j
+}
+
+func dict(values ...interface{}) map[string]interface{} {
+	dict := make(map[string]interface{}, len(values)/2)
+	if len(values)%2 != 0 {
+		panic("Invalid dictionary call")
+	}
+	for i := 0; i < len(values); i += 2 {
+		dict[values[i].(string)] = values[i+1]
+	}
+
+	return dict
+}
+
+func list(values ...string) []string {
+	return values
+}
+
+func genericTypingRepresentation(in int, out int, includeType bool) string {
+	seenElements := false
+	typing := ""
+	if in > 0 {
+		typing += fmt.Sprintf("[I%v", 0)
+		for i := 1; i < in; i++ {
+			typing += fmt.Sprintf(", I%v", i)
+		}
+		seenElements = true
+	}
+	if out > 0 {
+		i := 0
+		if !seenElements {
+			typing += fmt.Sprintf("[R%v", 0)
+			i++
+		}
+		for i < out {
+			typing += fmt.Sprintf(", R%v", i)
+			i++
+		}
+		seenElements = true
+	}
+
+	if seenElements {
+		if includeType {
+			typing += " any"
+		}
+		typing += "]"
+	}
+
+	return typing
+}
+
+func possibleBundleLifecycleParameterCombos(numInInterface interface{}, processElementInInterface interface{}) [][]string {
+	numIn := numInInterface.(int)
+	processElementIn := processElementInInterface.(int)
+	ordered_known_parameter_options := []string{"context.Context", "typex.PaneInfo", "[]typex.Window", "typex.EventTime", "typex.BundleFinalization"}
+	// Because of how Bundle lifecycle functions are invoked, all known parameters must preced unknown options and be in order.
+	// Once we hit an unknown options, all remaining unknown options must be included since all iters/emitters must be included
+	// Therefore, we can generate a powerset of the known options and fill out any remaining parameters with an ordered set of remaining unknown options
+	pSetSize := int(math.Pow(2, float64(len(ordered_known_parameter_options))))
+	combos := make([][]string, 0, pSetSize)
+
+	var index int
+	for index < pSetSize {

Review Comment:
   There doesn't seem to be any reason not to use a standard `for index := 0; index < pSetSize; index++ {` construct here.
   
   (Don't forget to remove the `index++` from the bottom of the loop.)



##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}

Review Comment:
   I strongly recommend we have the registration methods in *different* package than the main user surface beam package. It would make the package doc much harder to use.  
   
   If the package is called `register` then the methods become `register.DoFnNxMxO` etc. which read pretty cleanly. This also avoids forcing generics into the main beam package.



##########
sdks/go/pkg/beam/registration.tmpl:
##########
@@ -0,0 +1,292 @@
+{{define "StructWrappersAndFuncs_StartFinishBundle"}}
+{{$funcName := "unknown"}}{{$structName := "unknown"}}{{if (eq .func "startBundle")}}{{$funcName = "startBundle"}}{{$structName = "StartBundle"}}{{end}}{{if (eq .func "finishBundle")}}{{$funcName = "finishBundle"}}{{$structName = "FinishBundle"}}{{end}}
+	{{$funcName}}In := -1
+	{{$funcName}}Out := -1
+    var {{$funcName}}Wrapper func(fn interface{}) reflectx.Func
+	{{$funcName}}Method := reflect.ValueOf(doFn).MethodByName("{{$structName}}")
+	if {{$funcName}}Method.IsValid() {
+		{{$funcName}}In = {{$funcName}}Method.Type().NumIn()
+		{{$funcName}}Out = {{$funcName}}Method.Type().NumOut()
+	    switch {
+{{range $funcIn := upto 8}}
+    case {{$funcName}}In == {{$funcIn}}:
+            switch { {{range $funcOut := upto 2}}{{$possibleCombos := (possibleBundleLifecycleParameterCombos $funcIn $.processElementIn)}}{{if $possibleCombos}}
+            case {{$funcName}}Out == {{$funcOut}}:
+{{$first := true}}{{range $funcCombo := $possibleCombos}}{{if $first}}{{$first = false}}                {{else}} else {{end}}if _, ok := doFn.({{$funcName}}{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}); ok {
+                    {{$funcName}}Caller := func(fn interface{}) reflectx.Func {
+                        f := fn.(func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})
+                        return &caller{{$funcIn}}x{{$funcOut}}{{if (or $funcIn $funcOut)}}[{{(join $funcCombo ", ")}}{{if $funcOut}}{{if $funcIn}}, {{end}}error{{end}}]{{end}}{fn: f}
+                    }
+                    reflectx.RegisterFunc(reflect.TypeOf((*func({{(join $funcCombo ", ")}}){{if $funcOut}} error{{end}})(nil)).Elem(), {{$funcName}}Caller)
+                    {{$funcRegister := (makeStructRegisterEntry $funcName $structName $funcCombo (list))}}{{if $funcOut}}{{$funcRegister = (makeStructRegisterEntry $funcName $structName $funcCombo (list "error"))}}{{end}}
+                    {{$funcName}}Wrapper = func(fn interface{}) reflectx.Func {
+                        return {{$funcRegister}}
+                    }
+                } {{end}}{{end}}{{end}}
+            default:
+                panic("Invalid signature for {{$structName}}")
+            }
+{{end}}
+        default:
+            panic("Invalid signature for {{$structName}}")
+        }
+    }
+{{end}}
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated from registration.tmpl. DO NOT EDIT.
+
+package beam
+
+import (
+	"context"
+	"reflect"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+)
+{{$processElementMaxOut := 5}}{{$processElementMaxIn := 9}}{{$startFinishBundleOutRange := 2}}{{$startFinishBundleInRange := 8}}{{range $processElementOut := upto $processElementMaxOut}}{{range $processElementIn := upto $processElementMaxIn}}
+type doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut true)}} interface {
+    ProcessElement({{range $in := upto $processElementIn}}{{if $in}}, {{end}}i{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+type caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut true)}} struct {
+    fn func({{range $in := upto $processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}}
+}
+
+func (c *caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}) Name() string {
+	return reflectx.FunctionName(c.fn)
+}
+
+func (c *caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}) Type() reflect.Type {
+	return reflect.TypeOf(c.fn)
+}
+
+func (c *caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}) Call(args []interface{}) []interface{} {
+    {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}} := {{end}}c.fn({{range $in := upto $processElementIn}}{{if $in}}, {{end}}args[{{$in}}].(I{{$in}}){{end}})
+	return []interface{}{ {{if $processElementOut}}{{range $out := upto $processElementOut}}{{if $out}}, {{end}}out{{$out}}{{end}}{{end}} }
+}
+
+func (c *caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}) Call{{$processElementIn}}x{{$processElementOut}}({{range $in := upto $processElementIn}}{{if $in}}, {{end}}arg{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}interface{}{{end}}){{end}} {
+    {{if $processElementOut}}return {{end}}c.fn({{range $in := upto $processElementIn}}{{if $in}}, {{end}}arg{{$in}}{{end}})
+}
+
+func registerDoFn{{$processElementIn}}x{{$processElementOut}}StructWrappersAndFuncs{{(genericTypingRepresentation $processElementIn $processElementOut true)}}(doFn doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}) {
+    processElementCaller := func(fn interface{}) reflectx.Func {
+		f := fn.(func({{range $in := upto $processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})
+		return &caller{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}{fn: f}
+	}
+	reflectx.RegisterFunc(reflect.TypeOf((*func({{range $in := upto $processElementIn}}{{if $in}}, {{end}}I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}})(nil)).Elem(), processElementCaller)
+    processElementWrapper := func(fn interface{}) reflectx.Func {
+        return reflectx.MakeFunc(func({{range $in := upto $processElementIn}}{{if $in}}, {{end}}a{{$in}} I{{$in}}{{end}}){{if $processElementOut}} ({{range $out := upto $processElementOut}}{{if $out}}, {{end}}R{{$out}}{{end}}){{end}} {
+            {{if $processElementOut}}return {{end}}fn.(doFn{{$processElementIn}}x{{$processElementOut}}{{(genericTypingRepresentation $processElementIn $processElementOut false)}}).ProcessElement({{range $in := upto $processElementIn}}{{if $in}}, {{end}}a{{$in}}{{end}})
+        })
+    }{{template "StructWrappersAndFuncs_StartFinishBundle" (dict "processElementIn" $processElementIn "processElementOut" $processElementOut "func" "startBundle")}}{{template "StructWrappersAndFuncs_StartFinishBundle" (dict "processElementIn" $processElementIn "processElementOut" $processElementOut "func" "finishBundle")}}
+    var setupWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(setup0x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(setup0x0).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x0); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(setup1x0).Setup(a0)
+            })
+        }
+    } else if _, ok := doFn.(setup0x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(setup0x1).Setup()
+            })
+        }
+    } else if _, ok := doFn.(setup1x1); ok {
+        setupCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) error)(nil)).Elem(), setupCaller)
+
+        setupWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(setup1x1).Setup(a0)
+            })
+        }
+    }
+    var teardownWrapper func(fn interface{}) reflectx.Func
+    if _, ok := doFn.(teardown0x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func())
+            return &caller0x0{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() {
+                fn.(teardown0x0).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x0); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context))
+            return &caller1x0[context.Context]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context))(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) {
+                fn.(teardown1x0).Teardown(a0)
+            })
+        }
+    } else if _, ok := doFn.(teardown0x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func() error)
+            return &caller0x1[error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func() error)(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func() error {
+                return fn.(teardown0x1).Teardown()
+            })
+        }
+    } else if _, ok := doFn.(teardown1x1); ok {
+        teardownCaller := func(fn interface{}) reflectx.Func {
+            f := fn.(func(context.Context) error)
+            return &caller1x1[context.Context, error]{fn: f}
+        }
+        reflectx.RegisterFunc(reflect.TypeOf((*func(context.Context) error)(nil)).Elem(), teardownCaller)
+
+        teardownWrapper = func(fn interface{}) reflectx.Func {
+            return reflectx.MakeFunc(func(a0 context.Context) error {
+                return fn.(teardown1x1).Teardown(a0)
+            })
+        }
+    }
+    wrapperFn := func(fn interface{}) map[string]reflectx.Func {
+        m := map[string]reflectx.Func{}
+        if processElementWrapper != nil {
+            m["ProcessElement"] = processElementWrapper(fn)
+        }
+        if startBundleWrapper != nil {
+            m["StartBundle"] = startBundleWrapper(fn)
+        }
+        if finishBundleWrapper != nil {
+            m["FinishBundle"] = finishBundleWrapper(fn)
+        }
+        if setupWrapper != nil {
+            m["Setup"] = setupWrapper(fn)
+        }
+        if teardownWrapper != nil {
+            m["Teardown"] = teardownWrapper(fn)
+        }
+        
+        return m
+    }

Review Comment:
   This is probably something you had in mind for helping reduce the size of the generated code since a quick skim reads pretty static...
   
   Do recall that if a helper doesn't need arity assistance, it may be able to be put into a non-generated file the same package, and tested normally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org