You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/11 05:06:18 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11976: [BEAM-10169] - ParDo functions with correct output N in their error messages.

lostluck commented on a change in pull request #11976:
URL: https://github.com/apache/beam/pull/11976#discussion_r438475207



##########
File path: sdks/go/pkg/beam/core/util/reflectx/functions_test.go
##########
@@ -41,3 +45,19 @@ func TestLoadFunction(t *testing.T) {
 		t.Errorf("got %d, wanted %d", out[0].Int(), testFunction())
 	}
 }
+
+func TestFunctionOutputSize(t *testing.T) {
+	expected := 1
+	received := FunctionOutputSize(testFunction)
+	if received != expected {
+		t.Errorf("got %d, wanted %d", received, expected)
+	}
+}
+
+func TestFunction2OutputSize(t *testing.T) {

Review comment:
       Conventionally, if there are multiple cases for a given test, they should be combined into the same function, and run as a loop.
   
   See https://gobyexample.com/testing for an excellent example.
   
   Otherwise, this function should be named TestFunctionOutputSize_2

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {
+	doFnName := reflectx.FunctionName(doFn)
+	doFnOutSize := reflectx.FunctionOutputSize(doFn)
+
+	parDoName := reflectx.FunctionName(parDo)
+	parDoOutSize := reflectx.FunctionOutputSize(parDo)
+
+	useParDo := reflectx.FunctionName(RecommendParDo(doFnOutSize))
+	return fmt.Sprintf("DoFn %v has %v outptus, but %v requires %v outputs, Use %v instead.", doFnName, doFnOutSize, parDoName, parDoOutSize, useParDo)
+
+}
+
+// recommendParDo takes a in a DoFns emit dimension and recommends the correct
+// ParDo to use.
+func RecommendParDo(emitDim int) interface{} {
+	switch {
+	case emitDim == 0:
+		return ParDo0
+	case emitDim == 1:
+		return ParDo
+	case emitDim == 2:
+		return ParDo2
+	case emitDim == 3:
+		return ParDo3
+	case emitDim == 4:
+		return ParDo4
+	case emitDim == 5:
+		return ParDo5
+	case emitDim == 6:
+		return ParDo6
+	case emitDim == 7:
+		return ParDo7
+	}
+	return ParDoN
+}

Review comment:
       What I'd recommend here instead though is to return a string instead of the function.
   ```
   switch emitDim {
    case 0,2,3,4,5,6,7:
     return fmt.Sprintf("ParDo%d", emitDim)
   case 1:
     return "ParDo"  
   default:
     return "ParDoN"
   }
   ```
   
   Easier to read, and see if it's correct. Read [Effective Go for more about Switches](https://golang.org/doc/effective_go.html#switch)

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {
+	doFnName := reflectx.FunctionName(doFn)
+	doFnOutSize := reflectx.FunctionOutputSize(doFn)
+
+	parDoName := reflectx.FunctionName(parDo)
+	parDoOutSize := reflectx.FunctionOutputSize(parDo)
+
+	useParDo := reflectx.FunctionName(RecommendParDo(doFnOutSize))
+	return fmt.Sprintf("DoFn %v has %v outptus, but %v requires %v outputs, Use %v instead.", doFnName, doFnOutSize, parDoName, parDoOutSize, useParDo)
+
+}
+
+// recommendParDo takes a in a DoFns emit dimension and recommends the correct
+// ParDo to use.
+func RecommendParDo(emitDim int) interface{} {

Review comment:
       Same comment here, WRT exported functions being part of the external API. Please rename this to recommendParDo.

##########
File path: sdks/go/test/regression/pardo_test.go
##########
@@ -16,6 +16,8 @@
 package regression

Review comment:
       Consider putting this into the beam_test package rather than in the go regression test package. This package is intended for bugs when interacting with runners, which is harder to test independantly in the Go unit tests.
   
   In particular, I recommend adding a new test file:
   .../sdks/go/pkg/beam/pardo_test.go 
   
   The main "gotcha" is you should copy the License header to the top of the file, as that's a requirement for the project.

##########
File path: sdks/go/test/regression/pardo_test.go
##########
@@ -56,3 +58,39 @@ func TestEmitParDoAfterGBK(t *testing.T) {
 		t.Error(err)
 	}
 }
+
+// Keep the ParDo misalignment messages concise.
+// FIXME: Review
+// [beam-10169] identified that the error message returned to the user when the
+// DoFn output is misaligned with the ParDo output was unclear. in order to
+// make the user debug experience shorter, a more concise error message is
+// required.
+//
+// This suite of brittle tests are to ensure that the returned errors are adhere
+// to the ParDo API provided by the go sdk.
+
+func TestRecommendParDoWithOneOutput(t *testing.T) {

Review comment:
       Following from the comment I made before, if this test remains stand alone, TestRecommendParDo_OneOutput is the idiomatic name.  The general form is  Test<Func/struct under test>_<case>

##########
File path: sdks/go/pkg/beam/core/util/reflectx/functions.go
##########
@@ -45,3 +45,13 @@ func LoadFunction(ptr uintptr, t reflect.Type) interface{} {
 	*(*unsafe.Pointer)(unsafe.Pointer(v.Addr().Pointer())) = unsafe.Pointer(p)
 	return v.Interface()
 }
+
+//FunctionOutputSize returns the dimensions of the output of a function.
+// Panics if the type is not a function.
+func FunctionOutputSize(fn interface{}) int {

Review comment:
       Per other comments, I don't think this function is necessary, as fun and cool reflection is, using it should be avoided unless impossible because it complicates maintenance.
   
   1. DoFns can be structs instead, with a ProcessElement method.
   2. DoFn outputs in a function signature aren't directly related to the function outputs, so this is incorrect.
   
   DoFns can have "emitter functions" as parameters, which are used to output 0 or more values to the Nth PCollection of the ParDo. The main return (if one exists) is always the first PCollection, and they proceed from there. Similarly
   
   eg. a function with the signature
   `func(K,V string, e1, e2, e3 func(string)) (string, string)` would require a ParDo4 since it takes a PCollection<KV<string,string>> as an input, and has 4 outputs.
   The first output is a KV<string,string>, and then there are 3 PCollection<string> as well.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {

Review comment:
       In Go, Identifiers that start with a capital letter are Exported and accessible by code importing the package. As such Exported functions form the packages external API. Not that doesn't prevent the function from being unit tested in the associated _test.go file.
   
   We don't want this function as part of the Beam API surface, as it's an internal implementation detail, that isn't necessary for users to invoke directly.
   
   Further, the "er" bit idiomatically implies that it's an interface. Eg. Closer, Writer, Reader which are interfaces for values with a Close, Write, and Read methods respectively. 
   
   So with all that together, consider renaming it formatParDoError.

##########
File path: sdks/go/pkg/beam/core/util/reflectx/functions.go
##########
@@ -45,3 +45,13 @@ func LoadFunction(ptr uintptr, t reflect.Type) interface{} {
 	*(*unsafe.Pointer)(unsafe.Pointer(v.Addr().Pointer())) = unsafe.Pointer(p)
 	return v.Interface()
 }
+
+//FunctionOutputSize returns the dimensions of the output of a function.

Review comment:
       Style wise, you should have a space after the // and before the function name.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {
+	doFnName := reflectx.FunctionName(doFn)
+	doFnOutSize := reflectx.FunctionOutputSize(doFn)
+
+	parDoName := reflectx.FunctionName(parDo)
+	parDoOutSize := reflectx.FunctionOutputSize(parDo)
+
+	useParDo := reflectx.FunctionName(RecommendParDo(doFnOutSize))
+	return fmt.Sprintf("DoFn %v has %v outptus, but %v requires %v outputs, Use %v instead.", doFnName, doFnOutSize, parDoName, parDoOutSize, useParDo)
+
+}
+
+// recommendParDo takes a in a DoFns emit dimension and recommends the correct
+// ParDo to use.
+func RecommendParDo(emitDim int) interface{} {
+	switch {
+	case emitDim == 0:
+		return ParDo0
+	case emitDim == 1:
+		return ParDo
+	case emitDim == 2:
+		return ParDo2
+	case emitDim == 3:
+		return ParDo3
+	case emitDim == 4:
+		return ParDo4
+	case emitDim == 5:
+		return ParDo5
+	case emitDim == 6:
+		return ParDo6
+	case emitDim == 7:
+		return ParDo7
+	}
+	return ParDoN
+}

Review comment:
       Switch statements in Go can accept a value, so this can be rewritten as
   
   ```suggestion
   	switch emitDim {
   	case 0:
   		return ParDo0
   	case 1:
   		return ParDo
   	case 2:
   		return ParDo2
   	case 3:
   		return ParDo3
   	case  4:
   		return ParDo4
   	case 5:
   		return ParDo5
   	case 6:
   		return ParDo6
   	case 7:
   		return ParDo7
   	}
   	return ParDoN
   }
   ```

##########
File path: sdks/go/pkg/beam/core/util/reflectx/functions_test.go
##########
@@ -41,3 +45,19 @@ func TestLoadFunction(t *testing.T) {
 		t.Errorf("got %d, wanted %d", out[0].Int(), testFunction())
 	}
 }
+
+func TestFunctionOutputSize(t *testing.T) {
+	expected := 1
+	received := FunctionOutputSize(testFunction)
+	if received != expected {
+		t.Errorf("got %d, wanted %d", received, expected)
+	}
+}
+
+func TestFunction2OutputSize(t *testing.T) {
+	expected := 2
+	received := FunctionOutputSize(testFunction2)

Review comment:
       Idiomatically, Go uses "got and want" as the variables, as well as in the test error output.

##########
File path: sdks/go/pkg/beam/core/util/reflectx/functions_test.go
##########
@@ -41,3 +45,19 @@ func TestLoadFunction(t *testing.T) {
 		t.Errorf("got %d, wanted %d", out[0].Int(), testFunction())
 	}
 }
+
+func TestFunctionOutputSize(t *testing.T) {
+	expected := 1
+	received := FunctionOutputSize(testFunction)
+	if received != expected {
+		t.Errorf("got %d, wanted %d", received, expected)
+	}
+}
+
+func TestFunction2OutputSize(t *testing.T) {
+	expected := 2
+	received := FunctionOutputSize(testFunction2)
+	if received != expected {
+		t.Errorf("got %d, wanted %d", received, expected)

Review comment:
       It should be possible for a user to not look at the test code to understand what happened for this test.
   
   So a better test output would be:
   t.Errorf("FunctionOutputSize(%v) =  %d, want %d", <testfunctionname/signature>,received, expected)

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {
+	doFnName := reflectx.FunctionName(doFn)
+	doFnOutSize := reflectx.FunctionOutputSize(doFn)
+
+	parDoName := reflectx.FunctionName(parDo)
+	parDoOutSize := reflectx.FunctionOutputSize(parDo)
+
+	useParDo := reflectx.FunctionName(RecommendParDo(doFnOutSize))
+	return fmt.Sprintf("DoFn %v has %v outptus, but %v requires %v outputs, Use %v instead.", doFnName, doFnOutSize, parDoName, parDoOutSize, useParDo)

Review comment:
       typo: outputs
   The "Use" should be "use",

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {

Review comment:
       Note that regardless of the DoFn we need to check the expectations for every value against a given constant.
   
   Consider wrapping the whole if block in the new function instead, and passing in the DoFn (so we can get the name), ret for the number of outputs, and the expected number of outputs from the constant. The function can be called mustHaveOutputs() . This way we can avoid specifying the expected number of outputs twice (in the if condition, and in the message) because it'll be in a function parameter.
   
   (In Go, a Must prefix indicates the function will panic if the operation fails, rather than return an error. Usually they're discouraged, but in this case we have no choice but to panic. The ParDo functions could be described as Must functions, but that would get tiring WRT pipeline construction very quickly, so the shorter version without must, but with a Try prefix variant for the one Error handling version (TryParDo) was chosen instead.)

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -414,7 +414,45 @@ func ParDo6(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollec
 func ParDo7(s Scope, dofn interface{}, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection, PCollection, PCollection, PCollection) {
 	ret := MustN(TryParDo(s, dofn, col, opts...))
 	if len(ret) != 7 {
-		panic(fmt.Sprintf("expected 7 output. Found: %v", ret))
+		panic(ParDoErrorFormatter(dofn, ParDo7))
 	}
 	return ret[0], ret[1], ret[2], ret[3], ret[4], ret[5], ret[6]
 }
+
+//ParDoErrorFormatter is a helper function to provide a more concise error
+// message to the users when a DoFn and its ParDo pairing is incorrect.
+func ParDoErrorFormatter(doFn interface{}, parDo interface{}) string {
+	doFnName := reflectx.FunctionName(doFn)

Review comment:
       DoFns in the Go SDK can be Structs instead of funcs, which means these functions will panic with less helpful error messages since they won't be functions.
   
   However, we know about this already, so we have a suite of Go SDK internal functions to handle this. In this case You want graph.Fn which returns a graph.Fn which has a Name() method which can be used to get the DoFn's name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org