You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/09/08 13:27:42 UTC
[beam] branch master updated: clean up comments and register functional DoFn in wordcount.go (#23057)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 50a3bad014d clean up comments and register functional DoFn in wordcount.go (#23057)
50a3bad014d is described below
commit 50a3bad014d1371cc69c57404b4cc4593bdfc255
Author: David Huntsperger <56...@users.noreply.github.com>
AuthorDate: Thu Sep 8 06:27:33 2022 -0700
clean up comments and register functional DoFn in wordcount.go (#23057)
* clean up comments and register functional DoFn in wordcount.go
* updates in response to review
* update extractFn in comment to use a Go func signature
---
sdks/go/examples/wordcount/wordcount.go | 92 ++++++++++++++++++---------------
1 file changed, 51 insertions(+), 41 deletions(-)
diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go
index f09783de064..f875b45ce0e 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -13,29 +13,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// wordcount is an example that counts words in Shakespeare and includes Beam
-// best practices.
+// wordcount is an example that counts words in Shakespeare and demonstrates
+// Beam best practices.
//
// This example is the second in a series of four successively more detailed
-// 'word count' examples. You may first want to take a look at minimal_wordcount.
-// After you've looked at this example, then see the debugging_workcount
-// pipeline, for introduction of additional concepts.
+// 'word count' examples. You may first want to take a look at
+// minimal_wordcount. After you've looked at this example, see the
+// debugging_wordcount pipeline for introduction of additional concepts.
//
// For a detailed walkthrough of this example, see
//
-// https://beam.apache.org/get-started/wordcount-example/
+// https://beam.apache.org/get-started/wordcount-example/
//
-// Basic concepts, also in the minimal_wordcount example: Reading text files;
-// counting a PCollection; writing to text files
+// Basic concepts, also in the minimal_wordcount example: reading text files;
+// counting a PCollection; writing to text files.
//
-// New Concepts:
+// New concepts:
//
-// 1. Executing a Pipeline both locally and using the selected runner
-// 2. Defining your own pipeline options
-// 3. Using ParDo with static DoFns defined out-of-line
-// 4. Building a composite transform
+// 1. Executing a pipeline both locally and using the selected runner
+// 2. Defining your own pipeline options
+// 3. Using ParDo with static DoFns defined out-of-line
+// 4. Building a composite transform
//
-// Concept #1: you can execute this pipeline either locally or using by
+// Concept #1: You can execute this pipeline either locally or by
// selecting another runner. These are now command-line options added by
// the 'beamx' package and not hard-coded as they were in the minimal_wordcount
// example. The 'beamx' package also registers all included runners and
@@ -43,15 +43,15 @@
//
// To change the runner, specify:
//
-// --runner=YOUR_SELECTED_RUNNER
+// --runner=YOUR_SELECTED_RUNNER
//
// To execute this pipeline, specify a local output file (if using the
// 'direct' runner) or a remote file on a supported distributed file system.
//
-// --output=[YOUR_LOCAL_FILE | YOUR_REMOTE_FILE]
+// --output=[YOUR_LOCAL_FILE | YOUR_REMOTE_FILE]
//
// The input file defaults to a public data set containing the text of King
-// Lear, by William Shakespeare. You can override it and choose your own input
+// Lear by William Shakespeare. You can override it and choose your own input
// with --input.
package main
@@ -82,7 +82,7 @@ import (
)
// Concept #2: Defining your own configuration options. Pipeline options can
-// just be standard Go flags (or be obtained any other way). Defining and
+// be standard Go flags, or they can be obtained any other way. Defining and
// configuring the pipeline is normal Go code.
var (
// By default, this example reads from a public dataset containing the text of
@@ -93,39 +93,47 @@ var (
output = flag.String("output", "", "Output file (required).")
)
-// Concept #3: You can make your pipeline assembly code less verbose and by
+// Concept #3: You can make your pipeline assembly code less verbose by
// defining your DoFns statically out-of-line. A DoFn can be defined as a Go
-// function and is conventionally suffixed "Fn". The argument and return types
-// dictate the pipeline shape when used in a ParDo: for example,
+// function and is conventionally suffixed "Fn". Using named function
+// transforms allows for easy reuse, modular testing, and an improved monitoring
+// experience. The argument and return types of a function dictate the pipeline
+// shape when used in a ParDo. For example,
//
-// formatFn: string x int -> string
+// func formatFn(w string, c int) string
//
-// indicate that it operates on a PCollection of type KV<string,int>, representing
-// key value pairs of strings and ints, and outputs a PCollection of type string.
-// Beam typechecks the pipeline before running it.
+// indicates that the function operates on a PCollection of type KV<string,int>,
+// representing key value pairs of strings and ints, and outputs a PCollection
+// of type string. Beam typechecks the pipeline before running it.
//
-// DoFns that potentially output zero or multiple elements can also be Go functions,
-// but have a different signature. For example,
+// DoFns that potentially output zero or multiple elements can also be Go
+// functions, but have a different signature. For example,
//
-// extractFn : string x func(string) -> ()
+// func extractFn(w string, emit func(string))
//
-// uses an "emit" function argument instead of string return type to allow it to
-// output any number of elements. It operates on a PCollection of type string and
-// returns a PCollection of type string. Also, using named function transforms allows
-// for easy reuse, modular testing, and an improved monitoring experience.
+// uses an "emit" function argument instead of a string return type to allow it
+// to output any number of elements. It operates on a PCollection of type string
+// and returns a PCollection of type string.
//
// DoFns must be registered with Beam in order to be executed in ParDos. This is
// done automatically by the starcgen code generator, or it can be done manually
// by calling beam.RegisterFunction in an init() call.
func init() {
- // register.DoFnXxY registers a struct DoFn so that it can be correctly serialized and does some optimization
- // to avoid runtime reflection. Since extractFn has 3 inputs and 0 outputs, we use register.DoFn3x0 and provide
- // its input types as its constraints (if it had any outputs, we would add those as constraints as well).
- // Struct DoFns must be registered for a pipeline to run.
+ // register.DoFnXxY registers a struct DoFn so that it can be correctly
+ // serialized and does some optimization to avoid runtime reflection. Since
+ // extractFn has 3 inputs and 0 outputs, we use register.DoFn3x0 and provide
+ // its input types as its constraints (if it had any outputs, we would add
+ // those as constraints as well). Struct DoFns must be registered for a
+ // pipeline to run.
register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
- // register.EmitterX is optional and will provide some optimization to make things run faster. Any emitters
- // (functions that produce output for the next step) should be registered. Here we register all emitters with
- // the signature func(string).
+ // register.FunctionXxY registers a functional DoFn to optimize execution at
+ // runtime. formatFn has 2 inputs and 1 output, so we use
+ // register.Function2x1.
+ register.Function2x1(formatFn)
+ // register.EmitterX is optional and will provide some optimization to make
+ // things run faster. Any emitters (functions that produce output for the next
+ // step) should be registered. Here we register all emitters with the
+ // signature func(string).
register.Emitter1[string]()
}
@@ -137,7 +145,9 @@ var (
lineLen = beam.NewDistribution("extract", "lineLenDistro")
)
-// extractFn is a DoFn that emits the words in a given line and keeps a count for small words.
+// extractFn is a structural DoFn that emits the words in a given line and keeps
+// a count for small words. Its ProcessElement function will be invoked on each
+// element in the input PCollection.
type extractFn struct {
SmallWordLength int `json:"smallWordLength"`
}
@@ -157,7 +167,7 @@ func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(s
}
}
-// formatFn is a DoFn that formats a word and its count as a string.
+// formatFn is a functional DoFn that formats a word and its count as a string.
func formatFn(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}