You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/06/22 23:20:35 UTC

[beam-starter-go] 13/17: address review comments

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-starter-go.git

commit 5d5be4b3f099e7b1e3336d56713f7710ad573b87
Author: David Cavazos <dc...@google.com>
AuthorDate: Fri Jun 17 14:05:25 2022 -0700

    address review comments
---
 .github/workflows/test.yaml |  2 +-
 main.go                     | 29 +++++++++++++++++++----------
 main_test.go                |  2 +-
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index e5e3684..4f81103 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -17,6 +17,6 @@ jobs:
     - uses: actions/checkout@v3
     - uses: actions/setup-go@v3
       with:
-        go-version: '>=1.18.0'
+        go-version: go.mod
     - run: go version
     - run: go test ./...
diff --git a/main.go b/main.go
index 41c3c27..9c513c5 100644
--- a/main.go
+++ b/main.go
@@ -3,30 +3,39 @@ package main
 import (
 	"context"
 	"flag"
-	"fmt"
 	"log"
+	"strings"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 )
 
 var (
-	input_text = flag.String("input-text", "Default input text", "Input text to print.")
+	input_text = flag.String("input-text", "default input text", "Input text to print.")
 )
 
 func init() {
-	// DoFns should be registered with Beam.
-	beam.RegisterFunction(printAndEmit)
+	// DoFns should be registered with Beam to be available in distributed runners.
+	beam.RegisterFunction(toTitleCase)
+	beam.RegisterFunction(logAndEmit)
 }
 
-func printAndEmit(element string, emit func(string)) {
-	fmt.Println(element)
+// A simple function would take the element as an argument and return a single value.
+func toTitleCase(element string) string {
+	return strings.Title(element)
+}
+
+// You can also access the Context and "emit" zero or more values like FlatMap.
+func logAndEmit(ctx context.Context, element string, emit func(string)) {
+	beamLog.Infoln(ctx, element)
 	emit(element)
 }
 
-func myPipeline(scope beam.Scope, input_text string) beam.PCollection {
-	elements := beam.Create(scope, "Hello", "World!", input_text)
-	return beam.ParDo(scope, printAndEmit, elements)
+func MyPipeline(scope beam.Scope, input_text string) beam.PCollection {
+	elements := beam.Create(scope, "hello", "world!", input_text)
+	elements = beam.ParDo(scope, toTitleCase, elements)
+	return beam.ParDo(scope, logAndEmit, elements)
 }
 
 func main() {
@@ -35,7 +44,7 @@ func main() {
 
 	ctx := context.Background()
 	pipeline, scope := beam.NewPipelineWithRoot()
-	my_pipeline(scope, *input_text)
+	MyPipeline(scope, *input_text)
 
 	// Run the pipeline. You can specify your runner with the --runner flag.
 	if err := beamx.Run(ctx, pipeline); err != nil {
diff --git a/main_test.go b/main_test.go
index f4f95ed..77848f7 100644
--- a/main_test.go
+++ b/main_test.go
@@ -14,7 +14,7 @@ func TestPipeline(t *testing.T) {
 	pipeline, scope := beam.NewPipelineWithRoot()
 
 	// Note that the order of the elements doesn't matter.
-	elements := my_pipeline(scope, "Test")
+	elements := MyPipeline(scope, "Test")
 	passert.Equals(scope, elements, "Test", "Hello", "World!")
 	ptest.RunAndValidate(t, pipeline)
 }