You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/02/27 20:50:40 UTC

[GitHub] [beam] youngoli opened a new pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

youngoli opened a new pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991
 
 
   The current version of this validation is a bit permissive because it
   doesn't require the number of main inputs to be specified. This change
   allows specifying the number of main inputs, while also preserving
   the existing code path of not specifying it. Along with that change,
   I made some refactors to existing validation code to try to improve
   readability and make it more organized.
   
   This is filed under BEAM-3301 (SDF) because it is intended to enable
   validation for SDFs which is difficult without a known number of
   main inputs.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r391956907
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -239,52 +279,50 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 		return nil, addContext(err, fn)
 	}
 
-	// Start validating DoFn. First, check that ProcessElement has a main input.
+	// Validate ProcessElement has correct number of main inputs (as indicated by
+	// numMainIn), and that main inputs are before side inputs.
 	processFn := fn.methods[processElementName]
-	pos, num, ok := processFn.Inputs()
-	if ok {
-		first := processFn.Param[pos].Kind
-		if first != funcx.FnValue {
-			err := errors.New("side input parameters must follow main input parameter")
-			err = errors.SetTopLevelMsgf(err,
-				"Method %v of DoFns should always have a main input before side inputs, "+
-					"but it has side inputs (as Iters or ReIters) first in DoFn %v.",
-				processElementName, fn.Name())
-			err = errors.WithContextf(err, "method %v", processElementName)
-			return nil, addContext(err, fn)
-		}
+	if err := validateMainInputs(fn, processFn, processElementName, numMainIn); err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	// If numMainIn is unknown, we can try inferring it from the second input in ProcessElement.
+	// If there is none, or it's not a FnValue type, then we can safely infer that there's only
+	// one main input.
+	pos, num, _ := processFn.Inputs()
+	if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
 
 Review comment:
   validateMainInputs performs error checks we need to do before we can infer # of main inputs (stuff like making sure we have at least 1 input present). So moving this before validateMainInputs would just mean moving those error checks back above the inferring and nothing really changes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-592171327
 
 
   R: @lostluck 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595484373
 
 
   Run Go PostCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli merged pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388386045
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,74 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// The following constants prefixed with "Main" represent possible numbers of
 
 Review comment:
   I'm wary about exporting these constants.
   
   For one, they're untyped constants, so they're functionally the numbers themselves. 
   
   Otherwise the "right" go way to expose them so they have meaning would be to have an unexported type so users can't define their own, and then define the constants.
   
   ```
   type mainInputs int32
   
   const (
     MainUnknown mainInputs = -1
     MainSingle mainInputs = 1
     MainKV mainInputs = 2
   )
   ```
   
   Then any functional option configuration method can accept them to have type safe, pre-validated input numbers.
   
   ```
   func NumInputs(mi mainInputs) Option {
     return func(c *config) {
        c.numMainIn = mi
     }
   }
   ```
   
   This then saves needing to have a validation error, since package users can't define their own mainInputs.
   
   Another alternative is to do away with the exported constants altogether, keep the validation, but simply document that valid inputs are 1 and 2 for singletons and KVs respectively. Either is preferable to the current approach.
   
   Lets not lose sight that the purpose here is to pass a hint down to make the DoFn parameters easier to analyse. Windows and EventTimes are propagated with the main input, but don't "count" since they are easily detectable by type.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595512813
 
 
   The Postcommit error doesn't seem to be directly related to my change from what I can tell:
   
   > Error message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -488: process bundle failed for instruction -488 using plan -445 : panic: Unexpected coder: CoGBK<string[string],int[varintz],int[varintz],string[string]> goroutine 87 [running]:
   > runtime/debug.Stack(0xc00109d970, 0xd2c5e0, 0xc00113cb00)
   > 	/usr/lib/go-1.12/src/runtime/debug/stack.go:24 +0x9d
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc00109db90)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:40 +0x60
   > panic(0xd2c5e0, 0xc00113cb00)
   > 	/usr/lib/go-1.12/src/runtime/panic.go:522 +0x1b5
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MakeElementEncoder(0xc00009bdb0, 0xc00114b620, 0xc000822000)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/coder.go:91 +0x479
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*PCollection).Up(0xc000c20fc0, 0x10018e0, 0xc000c40f00, 0x0, 0xc0010b7b50)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pcollection.go:59 +0xfe
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x10018e0, 0xc000c40f00, 0xc0010b7c28, 0x0, 0x0)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:43 +0x6c
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc001222ee0, 0x10018e0, 0xc000c40f00, 0xc000d1a490, 0x4, 0xff0340, 0xc00114b440, 0xff0380, 0xc000c40f40, 0xc0010b7de0, ...)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:93 +0xdf
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0001f4480, 0x10017a0, 0xc0001bafc0, 0xc000c40d40, 0xc0001bafc0)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:211 +0xa34
   > github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x10017a0, 0xc0001bafc0, 0xc000c40d40)
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:118 +0x1cf
   > created by github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
   > 	/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Go_PR/src/sdks/go/test/.gogradle/project_gopath/src/github.com/apache/beam/sdks/go/test/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:131 +0x6e8
   > 
   > 	java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   > ...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-592171160
 
 
   Run Go PostCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-598480073
 
 
   I think doing this validation in the ParDo transform is something worth looking into, and I'd be up for it if it worked as well as you describe. I'm definitely not a fan of having to do validation without any info about the actual output/input involved. I've even entertained the idea of doing something similar, but it would be a decently large refactor (2-3 days?) and has the chance of hitting additional roadblocks, so I haven't really made time to look into it yet. Definitely something worth taking a day or two to look into after SDF is done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595512896
 
 
   Run Go PostCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388609914
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,74 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// The following constants prefixed with "Main" represent possible numbers of
+// DoFn main inputs for DoFn construction and validation. Any value not defined
+// here is an invalid number of main inputs.
+const (
+	MainUnknown = -1 // The number of main inputs is unknown for DoFn validation.
 
 Review comment:
   I'm leaving it exported only because AsDoFn is currently exported and takes one of these constants as an input. Making this unexported would make it impossible to call AsDoFn with the existing behavior (unknown num. of inputs).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388089313
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+	unknownInNum = -1 // Used when we don't know the number of main inputs.
+	singleInNum  = 1
+	kvInNum      = 2
+)
+
 // NewDoFn constructs a DoFn from the given value, if possible.
 func NewDoFn(fn interface{}) (*DoFn, error) {
 	ret, err := NewFn(fn)
 	if err != nil {
 		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
 	}
-	return AsDoFn(ret)
+	return AsDoFn(ret, unknownInNum)
 }
 
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
 
 Review comment:
   Done, went with the variadic options made of functions approach. If anyone else is reading this, based it off this article: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388608923
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,74 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// The following constants prefixed with "Main" represent possible numbers of
 
 Review comment:
   I definitely like those options better. Went with the unexported constant type, since it makes the code more self-documenting as opposed to raw numbers. Also removed the validation check on that parameter, like you suggested.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r391958266
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -239,52 +279,50 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 		return nil, addContext(err, fn)
 	}
 
-	// Start validating DoFn. First, check that ProcessElement has a main input.
+	// Validate ProcessElement has correct number of main inputs (as indicated by
+	// numMainIn), and that main inputs are before side inputs.
 	processFn := fn.methods[processElementName]
-	pos, num, ok := processFn.Inputs()
-	if ok {
-		first := processFn.Param[pos].Kind
-		if first != funcx.FnValue {
-			err := errors.New("side input parameters must follow main input parameter")
-			err = errors.SetTopLevelMsgf(err,
-				"Method %v of DoFns should always have a main input before side inputs, "+
-					"but it has side inputs (as Iters or ReIters) first in DoFn %v.",
-				processElementName, fn.Name())
-			err = errors.WithContextf(err, "method %v", processElementName)
-			return nil, addContext(err, fn)
-		}
+	if err := validateMainInputs(fn, processFn, processElementName, numMainIn); err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	// If numMainIn is unknown, we can try inferring it from the second input in ProcessElement.
+	// If there is none, or it's not a FnValue type, then we can safely infer that there's only
+	// one main input.
+	pos, num, _ := processFn.Inputs()
+	if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
+		numMainIn = MainSingle
 	}
 
 	// If the ProcessElement function includes side inputs or emit functions those must also be
 
 Review comment:
   It's part of the API for start/finishBundle. I don't remember why it's done that way though. lostluck@ might be able to answer why when he gets back.
   
   There might be room to make the side inputs/emits in start/finishBundle optional, but I believe right now it's mandatory (if we don't catch and throw an error here, it'll just break later on in translation or execution or something).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-597891830
 
 
   R: @lukecwik 
   
   Adding Luke to finish up this review since Robert (lostluck@) is on vacation for a bit.
   
   Since this was already mostly reviewed, the main thing I'm looking for is someone to confirm that I addressed Robert's previous review comments with the latest commit. You don't need to review the full change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388089432
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+	unknownInNum = -1 // Used when we don't know the number of main inputs.
+	singleInNum  = 1
+	kvInNum      = 2
+)
+
 // NewDoFn constructs a DoFn from the given value, if possible.
 func NewDoFn(fn interface{}) (*DoFn, error) {
 	ret, err := NewFn(fn)
 	if err != nil {
 		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
 	}
-	return AsDoFn(ret)
+	return AsDoFn(ret, unknownInNum)
 }
 
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
+	ret, err := NewFn(fn)
+	if err != nil {
+		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
+	}
+
+	if mainKv {
+		return AsDoFn(ret, kvInNum)
+	} else {
+		return AsDoFn(ret, singleInNum)
+	}
+}
+
+// AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many
+// main inputs are expected in the DoFn's method signatures. Valid values are
+// -1 (unknown), 1 (single elements), or 2 (KVs). If the value is unknown then
+// validation is done by best effort and may miss some edge cases.
+func AsDoFn(fn *Fn, numMainIn int) (*DoFn, error) {
 	addContext := func(err error, fn *Fn) error {
 		return errors.WithContextf(err, "graph.AsDoFn: for Fn named %v", fn.Name())
 	}
 
+	// Validate numMainIn. This check should match this method's comment.
+	if numMainIn != unknownInNum &&
+		numMainIn != singleInNum &&
+		numMainIn != kvInNum {
+		err := errors.Errorf("invalid number of main inputs given. "+
+			"Got: %v, Want: One of the following: %v",
+			processElementName, []int{unknownInNum, singleInNum, kvInNum})
+		return nil, addContext(err, fn)
+	}
 
 Review comment:
   I like that much better, done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r385468257
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+	unknownInNum = -1 // Used when we don't know the number of main inputs.
+	singleInNum  = 1
+	kvInNum      = 2
+)
+
 // NewDoFn constructs a DoFn from the given value, if possible.
 func NewDoFn(fn interface{}) (*DoFn, error) {
 	ret, err := NewFn(fn)
 	if err != nil {
 		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
 	}
-	return AsDoFn(ret)
+	return AsDoFn(ret, unknownInNum)
 }
 
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
 
 Review comment:
   With the name NewDoFnKv, it sounds like it's already assuming that a DoFn KV is being passed in. It's OK for there to be special purpose methods that only do one thing.
   
   Another option to consider instead of having two (or N) methods, consider extending the current NewDoFn with a variadic an Option type.  (eg. opts ...Option), this lets existing callers keep things the same, but allow for expanding things in the future. Option should probably be either a function type, or an interface type with private methods, and the options are provided by other methods in the package. eg. graph.NewDoFn(fn, graph.HasKVInput(), graph.HasRestriction()). This is valuable if we think being able to expand things in the future, but also lets us mix and match more easily later on. This way we can keep the existing behavior when there are no options, but keep the documentation of all the various uses in one place on the NewDoFn method referring to the option returning methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r385893620
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+	unknownInNum = -1 // Used when we don't know the number of main inputs.
+	singleInNum  = 1
+	kvInNum      = 2
+)
+
 // NewDoFn constructs a DoFn from the given value, if possible.
 func NewDoFn(fn interface{}) (*DoFn, error) {
 	ret, err := NewFn(fn)
 	if err != nil {
 		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
 	}
-	return AsDoFn(ret)
+	return AsDoFn(ret, unknownInNum)
 }
 
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
+	ret, err := NewFn(fn)
+	if err != nil {
+		return nil, errors.WithContext(errors.Wrapf(err, "invalid DoFn"), "constructing DoFn")
+	}
+
+	if mainKv {
+		return AsDoFn(ret, kvInNum)
+	} else {
+		return AsDoFn(ret, singleInNum)
+	}
+}
+
+// AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many
+// main inputs are expected in the DoFn's method signatures. Valid values are
+// -1 (unknown), 1 (single elements), or 2 (KVs). If the value is unknown then
+// validation is done by best effort and may miss some edge cases.
+func AsDoFn(fn *Fn, numMainIn int) (*DoFn, error) {
 	addContext := func(err error, fn *Fn) error {
 		return errors.WithContextf(err, "graph.AsDoFn: for Fn named %v", fn.Name())
 	}
 
+	// Validate numMainIn. This check should match this method's comment.
+	if numMainIn != unknownInNum &&
+		numMainIn != singleInNum &&
+		numMainIn != kvInNum {
+		err := errors.Errorf("invalid number of main inputs given. "+
+			"Got: %v, Want: One of the following: %v",
+			processElementName, []int{unknownInNum, singleInNum, kvInNum})
+		return nil, addContext(err, fn)
+	}
 
 Review comment:
   Consider a switch instead.
   
   ```suggestion
   	switch numMainIn {
   	case unknownInNum, singleInNum, kvInNum: 	// Valid 
   	default: // Invalid
   		err := errors.Errorf("invalid number of main inputs given. "+
   			"Got: %v, Want: One of the following: %v",
   			processElementName, []int{unknownInNum, singleInNum, kvInNum})
   		return nil, addContext(err, fn)
   	}
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-596330576
 
 
   Run Go PostCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r391766344
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -239,52 +279,50 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 		return nil, addContext(err, fn)
 	}
 
-	// Start validating DoFn. First, check that ProcessElement has a main input.
+	// Validate ProcessElement has correct number of main inputs (as indicated by
+	// numMainIn), and that main inputs are before side inputs.
 	processFn := fn.methods[processElementName]
-	pos, num, ok := processFn.Inputs()
-	if ok {
-		first := processFn.Param[pos].Kind
-		if first != funcx.FnValue {
-			err := errors.New("side input parameters must follow main input parameter")
-			err = errors.SetTopLevelMsgf(err,
-				"Method %v of DoFns should always have a main input before side inputs, "+
-					"but it has side inputs (as Iters or ReIters) first in DoFn %v.",
-				processElementName, fn.Name())
-			err = errors.WithContextf(err, "method %v", processElementName)
-			return nil, addContext(err, fn)
-		}
+	if err := validateMainInputs(fn, processFn, processElementName, numMainIn); err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	// If numMainIn is unknown, we can try inferring it from the second input in ProcessElement.
+	// If there is none, or it's not a FnValue type, then we can safely infer that there's only
+	// one main input.
+	pos, num, _ := processFn.Inputs()
+	if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
+		numMainIn = MainSingle
 	}
 
 	// If the ProcessElement function includes side inputs or emit functions those must also be
 
 Review comment:
   Not related to this PR but why?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
youngoli commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595523525
 
 
   Done: https://jira.apache.org/jira/browse/BEAM-9459

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r388424329
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -209,21 +209,74 @@ func (f *DoFn) RestrictionT() *reflect.Type {
 // a KV or not based on the other signatures (unless we're more loose about which
 // sideinputs are present). Bind should respect that.
 
+// The following constants prefixed with "Main" represent possible numbers of
+// DoFn main inputs for DoFn construction and validation. Any value not defined
+// here is an invalid number of main inputs.
+const (
+	MainUnknown = -1 // The number of main inputs is unknown for DoFn validation.
 
 Review comment:
   Consider if it is necessary to have an unknown constant exported at all? Even in the unexported type version of this code, Unknown a side effect of not passing the NumMainInput hint, rather than something a user should explicitly set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r393382209
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -239,52 +279,50 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 		return nil, addContext(err, fn)
 	}
 
-	// Start validating DoFn. First, check that ProcessElement has a main input.
+	// Validate ProcessElement has correct number of main inputs (as indicated by
+	// numMainIn), and that main inputs are before side inputs.
 	processFn := fn.methods[processElementName]
-	pos, num, ok := processFn.Inputs()
-	if ok {
-		first := processFn.Param[pos].Kind
-		if first != funcx.FnValue {
-			err := errors.New("side input parameters must follow main input parameter")
-			err = errors.SetTopLevelMsgf(err,
-				"Method %v of DoFns should always have a main input before side inputs, "+
-					"but it has side inputs (as Iters or ReIters) first in DoFn %v.",
-				processElementName, fn.Name())
-			err = errors.WithContextf(err, "method %v", processElementName)
-			return nil, addContext(err, fn)
-		}
+	if err := validateMainInputs(fn, processFn, processElementName, numMainIn); err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	// If numMainIn is unknown, we can try inferring it from the second input in ProcessElement.
+	// If there is none, or it's not a FnValue type, then we can safely infer that there's only
+	// one main input.
+	pos, num, _ := processFn.Inputs()
+	if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
+		numMainIn = MainSingle
 	}
 
 	// If the ProcessElement function includes side inputs or emit functions those must also be
 
 Review comment:
   At most relaxed we'd be able to either not require them at all if none are used, or isolate them by their types. All instances of a given side input or emit with the same type would need to be listed at once, since otherwise we have no way to distinguish them except by position.  Permitting Nothing to be set would be the most convenient, or permitting only the Side Inputs and not requireing the Emits.
   
   For now though, it's better to be more strict now and relax later, since the inverse is impossible, and such variety is harder to maintain if unnecessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r391771719
 
 

 ##########
 File path: sdks/go/pkg/beam/core/graph/fn.go
 ##########
 @@ -239,52 +279,50 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 		return nil, addContext(err, fn)
 	}
 
-	// Start validating DoFn. First, check that ProcessElement has a main input.
+	// Validate ProcessElement has correct number of main inputs (as indicated by
+	// numMainIn), and that main inputs are before side inputs.
 	processFn := fn.methods[processElementName]
-	pos, num, ok := processFn.Inputs()
-	if ok {
-		first := processFn.Param[pos].Kind
-		if first != funcx.FnValue {
-			err := errors.New("side input parameters must follow main input parameter")
-			err = errors.SetTopLevelMsgf(err,
-				"Method %v of DoFns should always have a main input before side inputs, "+
-					"but it has side inputs (as Iters or ReIters) first in DoFn %v.",
-				processElementName, fn.Name())
-			err = errors.WithContextf(err, "method %v", processElementName)
-			return nil, addContext(err, fn)
-		}
+	if err := validateMainInputs(fn, processFn, processElementName, numMainIn); err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	// If numMainIn is unknown, we can try inferring it from the second input in ProcessElement.
+	// If there is none, or it's not a FnValue type, then we can safely infer that there's only
+	// one main input.
+	pos, num, _ := processFn.Inputs()
+	if numMainIn == MainUnknown && (num == 1 || processFn.Param[pos+1].Kind != funcx.FnValue) {
 
 Review comment:
   Wouldn't it make sense to infer the number of inputs before validateMainInputs?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595519138
 
 
   No, but it looks like it's somehow related to mine. I'm going to roll it back.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #10991: [BEAM-3301] Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#issuecomment-595520053
 
 
   Could you file a JIRA with the trace and assign it to me please? I'm in the middle of packing.
   https://github.com/apache/beam/pull/11061 is the revert.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services