You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/06/29 17:35:34 UTC

[beam] branch master updated: [BEAM-14347] Update docs to prefer generic registration functions (#17799)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fd5e2f4f01a [BEAM-14347] Update docs to prefer generic registration functions (#17799)
fd5e2f4f01a is described below

commit fd5e2f4f01a68b19e300b747b77548037e84183b
Author: Danny McCormick <da...@google.com>
AuthorDate: Wed Jun 29 13:35:28 2022 -0400

    [BEAM-14347] Update docs to prefer generic registration functions (#17799)
    
    * [BEAM-14347] Update docs to prefer generic registration functions
    
    * A little cleanup
    
    * register.Function changes
    
    * Type infference is nice
    
    * Extraneous registerType
---
 sdks/go/examples/snippets/04transforms.go          | 41 +++++++----
 .../content/en/documentation/programming-guide.md  | 79 ++++++++++++++++++----
 2 files changed, 95 insertions(+), 25 deletions(-)

diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 0713fa7a9d6..7fea2cfb155 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
 )
 
@@ -43,6 +44,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
 // DoFns must be registered with beam.
 func init() {
 	beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
+	// 2 inputs and 0 outputs => DoFn2x0
+	// 1 input => Emitter1
+	// Input/output types are included in order in the brackets
+	register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{})
+	register.Emitter1[int]()
 }
 
 // [END model_pardo_pardo]
@@ -88,9 +94,8 @@ func splitStringPair(e stringPair) (string, string) {
 }
 
 func init() {
-	// Register element types and DoFns.
-	beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem())
-	beam.RegisterFunction(splitStringPair)
+	// Register DoFn.
+	register.Function1x2(splitStringPair)
 }
 
 // CreateAndSplit is a helper function that creates
@@ -212,7 +217,9 @@ func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) str
 }
 
 func init() {
-	beam.RegisterFunction(formatCoGBKResults)
+	register.Function3x1(formatCoGBKResults)
+	// 1 input of type string => Iter1[string]
+	register.Iter1[string]()
 }
 
 // [END cogroupbykey_output_helpers]
@@ -266,7 +273,7 @@ func sumInts(a, v int) int {
 }
 
 func init() {
-	beam.RegisterFunction(sumInts)
+	register.Function2x1(sumInts)
 }
 
 func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection {
@@ -286,7 +293,7 @@ func (fn *boundedSum) MergeAccumulators(a, v int) int {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*boundedSum)(nil)))
+	register.Combiner1[int](&boundedSum{})
 }
 
 func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection) beam.PCollection {
@@ -323,7 +330,7 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*averageFn)(nil)))
+	register.Combiner3[averageAccum, int, float64](&averageFn{})
 }
 
 // [END combine_custom_average]
@@ -380,7 +387,7 @@ func decileFn(student Student) int {
 }
 
 func init() {
-	beam.RegisterFunction(decileFn)
+	register.Function1x1(decileFn)
 }
 
 // [END model_multiple_pcollections_partition_fn]
@@ -427,8 +434,12 @@ func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff func(st
 }
 
 func init() {
-	beam.RegisterFunction(filterWordsAbove)
-	beam.RegisterFunction(filterWordsBelow)
+	register.Function3x1(filterWordsAbove)
+	register.Function3x0(filterWordsBelow)
+	// 1 input of type string => Emitter1[string]
+	register.Emitter1[string]()
+	// 1 input of type float64 => Iter1[float64]
+	register.Iter1[float64]()
 }
 
 // [END model_pardo_side_input_dofn]
@@ -480,8 +491,10 @@ func processWordsMixed(word string, emitMarked func(string)) int {
 }
 
 func init() {
-	beam.RegisterFunction(processWords)
-	beam.RegisterFunction(processWordsMixed)
+	register.Function4x0(processWords)
+	register.Function2x1(processWordsMixed)
+	// 1 input of type string => Emitter1[string]
+	register.Emitter1[string]()
 }
 
 // [END model_multiple_output_dofn]
@@ -525,7 +538,9 @@ func extractWordsFn(pn beam.PaneInfo, line string, emitWords func(string)) {
 // [END model_paneinfo]
 
 func init() {
-	beam.RegisterFunction(extractWordsFn)
+	register.Function3x0(extractWordsFn)
+	// 1 input of type string => Emitter1[string]
+	register.Emitter1[string]()
 }
 
 // [START countwords_composite]
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 15e9a213053..04817bd86da 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -432,12 +432,6 @@ around to distributed workers). The Beam SDKs provide a data encoding mechanism
 that includes built-in encoding for commonly-used types as well as support for
 specifying custom encodings as needed.
 
-{{< paragraph class="language-go" >}}
-Custom struct types should be registered with beam using `beam.RegisterType`.
-Among other things, this allows the Go SDK to infer an encoding from their
-exported fields. Unexported fields in struct types are ignored.
-{{< /paragraph >}}
-
 #### 3.2.2. Element schema {#element-schema}
 
 In many cases, the element type in a `PCollection` has a structure that can introspected.
@@ -704,6 +698,31 @@ processing function.
 
 </span>
 
+{{< paragraph class="language-go" >}}
+All DoFns should be registered using a generic `register.DoFnXxY[...]`
+function. This allows the Go SDK to infer an encoding from any inputs/outputs,
+registers the DoFn for execution on remote runners, and optimizes the runtime
+execution of the DoFns via reflection.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+// ComputeWordLengthFn is a DoFn that computes the word length of string elements.
+type ComputeWordLengthFn struct{}
+
+// ProcessElement computes the length of word and emits the result.
+// When creating structs as a DoFn, the ProcessElement method performs the
+// work of this step in the pipeline.
+func (fn *ComputeWordLengthFn) ProcessElement(ctx context.Context, word string) int {
+   ...
+}
+
+func init() {
+  // 2 inputs and 1 output => DoFn2x1
+  // Input/output types are included in order in the brackets
+	register.DoFn2x1[context.Context, string, int](&ComputeWordLengthFn{})
+}
+{{< /highlight >}}
+
 ##### 4.2.1.1. Applying ParDo {#applying-pardo}
 
 {{< paragraph class="language-java language-py" >}}
@@ -799,7 +818,11 @@ func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
+  // 2 inputs and 0 outputs => DoFn2x0
+  // 1 input => Emitter1
+  // Input/output types are included in order in the brackets
+	register.Function2x0(&ComputeWordLengthFn{})
+	register.Emitter1[int]()
 }
 {{< /highlight >}}
 
@@ -862,7 +885,11 @@ Simple DoFns can also be written as functions.
 func ComputeWordLengthFn(word string, emit func(int)) { ... }
 
 func init() {
-	beam.RegisterFunction(ComputeWordLengthFn)
+  // 2 inputs and 0 outputs => DoFn2x0
+  // 1 input => Emitter1
+  // Input/output types are included in order in the brackets
+  register.DoFn2x0[string, func(int)](&ComputeWordLengthFn{})
+  register.Emitter1[int]()
 }
 {{< /highlight >}}
 
@@ -1027,7 +1054,7 @@ var words beam.PCollection = ...
 <span class="language-go" >
 
 > **Note:** Anonymous function DoFns may not work on distributed runners.
-> It's recommended to use named functions and register them with `beam.RegisterFunction` in
+> It's recommended to use named functions and register them with `register.FunctionXxY` in
 > an `init()` block.
 
 </span>
@@ -1333,6 +1360,26 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>, I
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" combine_bounded_sum >}}
 {{< /highlight >}}
 
+{{< paragraph class="language-go" >}}
+All Combiners should be registered using a generic `register.CombinerX[...]`
+function. This allows the Go SDK to infer an encoding from any inputs/outputs,
+registers the Combiner for execution on remote runners, and optimizes the runtime
+execution of the Combiner via reflection.
+
+Combiner1 should be used when your accumulator, input, and output are all of the
+same type. It can be called with `register.Combiner1[T](&CustomCombiner{})` where `T`
+is the type of the input/accumulator/output.
+
+Combiner2 should be used when your accumulator, input, and output are 2 distinct
+types. It can be called with `register.Combiner2[T1, T2](&CustomCombiner{})` where
+`T1` is the type of the accumulator and `T2` is the other type.
+
+Combiner3 should be used when your accumulator, input, and output are 3 distinct
+types. It can be called with `register.Combiner3[T1, T2, T3](&CustomCombiner{})`
+where `T1` is the type of the accumulator, `T2` is the type of the input, and `T3` is
+the type of the output.
+{{< /paragraph >}}
+
 {{< highlight go >}}
 {{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum >}}
 {{< /highlight >}}
@@ -1714,9 +1761,10 @@ a remote worker in your processing cluster.
 as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`;
 however, your subclass must not add any non-serializable members.</span>
 <span class="language-go">Funcs are serializable as long as
-they are registered with `beam.RegisterFunction`, and are not
-closures. Structural `DoFn`s will have all exported fields serialized.
-Unexported fields are unable to be serialized, and will be silently ignored.</span>
+they are registered with `register.FunctionXxY` (for simple functions) or
+`register.DoFnXxY` (for sturctural DoFns), and are not closures. Structural
+`DoFn`s will have all exported fields serialized. Unexported fields are unable to
+be serialized, and will be silently ignored.</span>
 
 Some other serializability factors you should keep in mind are:
 
@@ -1777,6 +1825,10 @@ processing each element in the input `PCollection`, but the additional data
 needs to be determined at runtime (and not hard-coded). Such values might be
 determined by the input data, or depend on a different branch of your pipeline.
 
+{{< paragraph class="language-go" >}}
+All side input iterables should be registered using a generic `register.IterX[...]`
+function. This optimizes runtime execution of the iterable.
+{{< /paragraph >}}
 
 #### 4.4.1. Passing side inputs to ParDo {#side-inputs-pardo}
 
@@ -1968,6 +2020,9 @@ multiple output PCollections.
 Call emitter functions as needed to produce 0 or more elements for its matching
 `PCollection`. The same value can be emitted with multiple emitters.
 As normal, do not mutate values after emitting them from any emitter.
+
+All emitters should be registered using a generic `register.EmitterX[...]`
+function. This optimizes runtime execution of the emitter.
 {{< /paragraph >}}
 
 {{< paragraph class="language-go" >}}