You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/22 16:38:53 UTC

[GitHub] [beam] lostluck opened a new pull request #12903: [BEAM-9616] Add RegisterDoFn

lostluck opened a new pull request #12903:
URL: https://github.com/apache/beam/pull/12903


   Adds a beam.RegisterDoFn call for users to call on their DoFns instead of individually calling RegisterFunction and RegisterType for all parameter types, using reflection.
   
   These registrations are important to ensure workers can process and lookup types and functions correctly. 
   
   TODO in later PRs:
   * Update starcgen to key off of RegisterDoFn calls for generating type assertion shims.
   * Make use of RegisterDoFn everywhere in the GoSDK and examples and document it as a best practice.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | ![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.
 apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12903:
URL: https://github.com/apache/beam/pull/12903#issuecomment-696839567


   R: @youngoli @tysonjh 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12903:
URL: https://github.com/apache/beam/pull/12903#discussion_r494697535



##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)

Review comment:
       Could this just use the `rt` from near the top of the function? It's a bit confusing right now with it named the same as a different scoped variable in the same function.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()

Review comment:
       What's the reason for creating a new interface from a reflect type for dofn here? Is it a way to reset the dofn to it's default values by creating a new instance?

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)
+	switch rt.Kind() {
+	case reflect.Func:
+		retFunc = dofn
+		c.regFuncTypes(rt)
+	default:
+		c.regType(rt)
+	}
+	var retTypes []reflect.Type
+	for _, t := range c {
+		retTypes = append(retTypes, t)
+	}
+	return retFunc, retTypes, nil
+}
+
+func handleDoFn(fn *graph.DoFn, c cache) {
+	c.pullMethod(fn.SetupFn())
+	c.pullMethod(fn.StartBundleFn())
+	c.pullMethod(fn.ProcessElementFn())
+	c.pullMethod(fn.FinishBundleFn())
+	c.pullMethod(fn.TeardownFn())
+	if !fn.IsSplittable() {
+		return
+	}
+	sdf := (*graph.SplittableDoFn)(fn)
+	c.pullMethod(sdf.CreateInitialRestrictionFn())
+	c.pullMethod(sdf.CreateTrackerFn())

Review comment:
       Note that while restrictions need to be serializable, restriction trackers don't (and probably can't). Registering RTrackers is probably not a problem as long as nothing actually tries to use the registered type, but otherwise you can probably avoid registering RTrackers by removing this line pulling CreateTrackerFn.

##########
File path: sdks/go/pkg/beam/core/runtime/symbols.go
##########
@@ -72,7 +72,7 @@ type SymbolResolver interface {
 // RegisterFunction allows function registration. It is beneficial for performance
 // and is needed for functions -- such as custom coders -- serialized during unit
 // tests, where the underlying symbol table is not available. It should be called
-// in init() only. Returns the external key for the function.
+// in `init()` only.

Review comment:
       Nitpick: Is removing the last sentence intentional? If so, should it also be removed in forward.go?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12903:
URL: https://github.com/apache/beam/pull/12903#issuecomment-696839567


   R: @youngoli @tysonjh 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12903:
URL: https://github.com/apache/beam/pull/12903#issuecomment-699045565


   I ended up adding the usage documentation, may as well start off on the right foot with the recommended boilerplate.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #12903:
URL: https://github.com/apache/beam/pull/12903


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12903:
URL: https://github.com/apache/beam/pull/12903#discussion_r494697535



##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)

Review comment:
       Could this just use the `rt` from near the top of the function? It's a bit confusing right now with it named the same as a different scoped variable in the same function.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()

Review comment:
       What's the reason for creating a new interface from a reflect type for dofn here? Is it a way to reset the dofn to it's default values by creating a new instance?

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)
+	switch rt.Kind() {
+	case reflect.Func:
+		retFunc = dofn
+		c.regFuncTypes(rt)
+	default:
+		c.regType(rt)
+	}
+	var retTypes []reflect.Type
+	for _, t := range c {
+		retTypes = append(retTypes, t)
+	}
+	return retFunc, retTypes, nil
+}
+
+func handleDoFn(fn *graph.DoFn, c cache) {
+	c.pullMethod(fn.SetupFn())
+	c.pullMethod(fn.StartBundleFn())
+	c.pullMethod(fn.ProcessElementFn())
+	c.pullMethod(fn.FinishBundleFn())
+	c.pullMethod(fn.TeardownFn())
+	if !fn.IsSplittable() {
+		return
+	}
+	sdf := (*graph.SplittableDoFn)(fn)
+	c.pullMethod(sdf.CreateInitialRestrictionFn())
+	c.pullMethod(sdf.CreateTrackerFn())

Review comment:
       Note that while restrictions need to be serializable, restriction trackers don't (and probably can't). Registering RTrackers is probably not a problem as long as nothing actually tries to use the registered type, but otherwise you can probably avoid registering RTrackers by removing this line pulling CreateTrackerFn.

##########
File path: sdks/go/pkg/beam/core/runtime/symbols.go
##########
@@ -72,7 +72,7 @@ type SymbolResolver interface {
 // RegisterFunction allows function registration. It is beneficial for performance
 // and is needed for functions -- such as custom coders -- serialized during unit
 // tests, where the underlying symbol table is not available. It should be called
-// in init() only. Returns the external key for the function.
+// in `init()` only.

Review comment:
       Nitpick: Is removing the last sentence intentional? If so, should it also be removed in forward.go?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #12903:
URL: https://github.com/apache/beam/pull/12903#discussion_r495104599



##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)

Review comment:
       No, that rt is out of scope and it's only present if the user passed in a reflect.Type instance. It's scoped to the if statement. It's also nearly 30 lines up, which hampers readability a bit by having an extended distance between declaration and use. This is adjacent to the uses, and that if block at the start ensures that there's only one way to interpret the rt variable here. It's the type of the dofn.
   
   Semantically, this is identical to calling all error values err. I'm always calling arbitrary reflect.Type instances rt.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()

Review comment:
       The graph.NewFn validator requires an actual instance of the DoFn, since graph.NewFn was designed for pipeline construction. This line creates such an instance, if what is passed in is reflect.Type.

##########
File path: sdks/go/pkg/beam/core/runtime/symbols.go
##########
@@ -72,7 +72,7 @@ type SymbolResolver interface {
 // RegisterFunction allows function registration. It is beneficial for performance
 // and is needed for functions -- such as custom coders -- serialized during unit
 // tests, where the underlying symbol table is not available. It should be called
-// in init() only. Returns the external key for the function.
+// in `init()` only.

Review comment:
       Great catch! It should be removed from forward.go! I removed it since this function doesn't return anything ever, so it was incorrect documentation.

##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()
+	}
+	fn, err := graph.NewFn(dofn)
+	if err != nil {
+		return nil, nil, err
+	}
+	c := cache{}
+	var valid bool
+	// Validates that this is a DoFn or combineFn.
+	do, err := graph.AsDoFn(fn, graph.MainUnknown)
+	if err == nil {
+		valid = true
+		handleDoFn(do, c)
+	} else if cmb, err2 := graph.AsCombineFn(fn); err2 == nil {
+		valid = true
+		handleCombineFn(cmb, c)
+	}
+	if !valid {
+		// Return the DoFn specific error as that's more common.
+		return nil, nil, err
+	}
+
+	var retFunc interface{}
+	rt := reflect.TypeOf(dofn)
+	switch rt.Kind() {
+	case reflect.Func:
+		retFunc = dofn
+		c.regFuncTypes(rt)
+	default:
+		c.regType(rt)
+	}
+	var retTypes []reflect.Type
+	for _, t := range c {
+		retTypes = append(retTypes, t)
+	}
+	return retFunc, retTypes, nil
+}
+
+func handleDoFn(fn *graph.DoFn, c cache) {
+	c.pullMethod(fn.SetupFn())
+	c.pullMethod(fn.StartBundleFn())
+	c.pullMethod(fn.ProcessElementFn())
+	c.pullMethod(fn.FinishBundleFn())
+	c.pullMethod(fn.TeardownFn())
+	if !fn.IsSplittable() {
+		return
+	}
+	sdf := (*graph.SplittableDoFn)(fn)
+	c.pullMethod(sdf.CreateInitialRestrictionFn())
+	c.pullMethod(sdf.CreateTrackerFn())

Review comment:
       RTs would be registered anyway since they show up in the ProcessElement signature. It's OK for extra types to be registered, the framework just won't automatically synthesize them like it needs to for other elements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #12903:
URL: https://github.com/apache/beam/pull/12903#discussion_r495407788



##########
File path: sdks/go/pkg/beam/core/runtime/genx/genx.go
##########
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package genx is a convenience package to better support the code
+// generator. It can be depended on by the user facing beam package
+// and be refered to by generated code.
+//
+// Similarly, it can depend on beam internals and access the canonical
+// method list in the graph package, or other packages to filter out
+// types that aren't necessary for registration (like context.Context).
+package genx
+
+import (
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+)
+
+// RegisterDoFn is a convenience function for registering DoFns.
+// Differs from RegisterFunction and RegisterType by introspecting
+// all parameters and returns of Lifecycle methods on the dofn,
+// and registers those types for you.
+//
+// Panics if not passed a dofn.
+func RegisterDoFn(dofn interface{}) {
+	f, ts, err := registerDoFn(dofn)
+	if err != nil {
+		panic(err)
+	}
+	if f != nil {
+		runtime.RegisterFunction(f)
+	}
+	for _, t := range ts {
+		runtime.RegisterType(t)
+	}
+}
+
+// registerDoFn returns all types associated with the provided DoFn.
+// If passed a functional DoFn, the first return is a Function to
+// register with runtime.RegisterFunction.
+// The second return is all types to register with runtime.RegisterType.
+// Returns an error if the passed in values are not DoFns.
+func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
+	if rt, ok := dofn.(reflect.Type); ok {
+		if rt.Kind() == reflect.Ptr {
+			rt = rt.Elem()
+		}
+		dofn = reflect.New(rt).Interface()

Review comment:
       Oh, duh. I had a brain-fart that this statement only happens if the dofn is a reflect.Type. I was imagining turning an actual DoFn instance into a reflect.Type and back. Makes complete sense now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12903:
URL: https://github.com/apache/beam/pull/12903#issuecomment-699041060


   Thanks for the review! I'm also not super adept with ast, so we're in the same boat. The short version is that it's breaking down all the go syntax things for us. I did cover a good number of reasonable cases that do work with RegisterDoFn, and we avoid the rest by making sure the documentation (the final documentation) and the examples are complete, and consistent, which will avoid bad cases that can't be handled.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #12903: [BEAM-9616] Add RegisterDoFn

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #12903:
URL: https://github.com/apache/beam/pull/12903#issuecomment-699292528


   Heh no worries.
   
   On Fri, Sep 25, 2020, 8:18 PM Daniel Oliveira <no...@github.com>
   wrote:
   
   > *@youngoli* commented on this pull request.
   > ------------------------------
   >
   > In sdks/go/pkg/beam/core/runtime/genx/genx.go
   > <https://github.com/apache/beam/pull/12903#discussion_r495407788>:
   >
   > > +	for _, t := range ts {
   > +		runtime.RegisterType(t)
   > +	}
   > +}
   > +
   > +// registerDoFn returns all types associated with the provided DoFn.
   > +// If passed a functional DoFn, the first return is a Function to
   > +// register with runtime.RegisterFunction.
   > +// The second return is all types to register with runtime.RegisterType.
   > +// Returns an error if the passed in values are not DoFns.
   > +func registerDoFn(dofn interface{}) (interface{}, []reflect.Type, error) {
   > +	if rt, ok := dofn.(reflect.Type); ok {
   > +		if rt.Kind() == reflect.Ptr {
   > +			rt = rt.Elem()
   > +		}
   > +		dofn = reflect.New(rt).Interface()
   >
   > Oh, duh. I had a brain-fart that this statement only happens if the dofn
   > is a reflect.Type. I was imagining turning an actual DoFn instance into a
   > reflect.Type and back. Makes complete sense now.
   >
   > —
   > You are receiving this because you modified the open/close state.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/12903#discussion_r495407788>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/ADKDOFOLZ77ACW4VD36USQTSHVMPHANCNFSM4RV745WQ>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org