You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/06/22 23:20:35 UTC
[beam-starter-go] 13/17: address review comments
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-starter-go.git
commit 5d5be4b3f099e7b1e3336d56713f7710ad573b87
Author: David Cavazos <dc...@google.com>
AuthorDate: Fri Jun 17 14:05:25 2022 -0700
address review comments
---
.github/workflows/test.yaml | 2 +-
main.go | 29 +++++++++++++++++++----------
main_test.go | 2 +-
3 files changed, 21 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index e5e3684..4f81103 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -17,6 +17,6 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
- go-version: '>=1.18.0'
+ go-version: go.mod
- run: go version
- run: go test ./...
diff --git a/main.go b/main.go
index 41c3c27..9c513c5 100644
--- a/main.go
+++ b/main.go
@@ -3,30 +3,39 @@ package main
import (
"context"
"flag"
- "fmt"
"log"
+ "strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
var (
- input_text = flag.String("input-text", "Default input text", "Input text to print.")
+ input_text = flag.String("input-text", "default input text", "Input text to print.")
)
func init() {
- // DoFns should be registered with Beam.
- beam.RegisterFunction(printAndEmit)
+ // DoFns should be registered with Beam to be available in distributed runners.
+ beam.RegisterFunction(toTitleCase)
+ beam.RegisterFunction(logAndEmit)
}
-func printAndEmit(element string, emit func(string)) {
- fmt.Println(element)
+// A simple function would take the element as an argument and return a single value.
+func toTitleCase(element string) string {
+ return strings.Title(element)
+}
+
+// You can also access the Context and "emit" zero or more values like FlatMap.
+func logAndEmit(ctx context.Context, element string, emit func(string)) {
+ beamLog.Infoln(ctx, element)
emit(element)
}
-func myPipeline(scope beam.Scope, input_text string) beam.PCollection {
- elements := beam.Create(scope, "Hello", "World!", input_text)
- return beam.ParDo(scope, printAndEmit, elements)
+func MyPipeline(scope beam.Scope, input_text string) beam.PCollection {
+ elements := beam.Create(scope, "hello", "world!", input_text)
+ elements = beam.ParDo(scope, toTitleCase, elements)
+ return beam.ParDo(scope, logAndEmit, elements)
}
func main() {
@@ -35,7 +44,7 @@ func main() {
ctx := context.Background()
pipeline, scope := beam.NewPipelineWithRoot()
- my_pipeline(scope, *input_text)
+ MyPipeline(scope, *input_text)
// Run the pipeline. You can specify your runner with the --runner flag.
if err := beamx.Run(ctx, pipeline); err != nil {
diff --git a/main_test.go b/main_test.go
index f4f95ed..77848f7 100644
--- a/main_test.go
+++ b/main_test.go
@@ -14,7 +14,7 @@ func TestPipeline(t *testing.T) {
pipeline, scope := beam.NewPipelineWithRoot()
// Note that the order of the elements doesn't matter.
- elements := my_pipeline(scope, "Test")
+ elements := MyPipeline(scope, "Test")
passert.Equals(scope, elements, "Test", "Hello", "World!")
ptest.RunAndValidate(t, pipeline)
}