You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/07/13 01:05:32 UTC

[beam] branch master updated: Move Xlang Go examples to generic registration (#22249)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c8e7eb7a39 Move Xlang Go examples to generic registration (#22249)
2c8e7eb7a39 is described below

commit 2c8e7eb7a39cbe3a1678a5c6b8b3f8700d4d8706
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Tue Jul 12 21:05:22 2022 -0400

    Move Xlang Go examples to generic registration (#22249)
    
    * WC generics
    
    * Partition generics
    
    * GroupBy generics
    
    * Flatten generics
    
    * combine_globally generics
    
    * Combine generics
    
    * CoGBK generics
    
    * BigQuery generics
    
    * Remove old versions
---
 sdks/go/examples/xlang/bigquery/wordcount.go                |  6 ++----
 sdks/go/examples/xlang/cogroup_by/cogroup_by.go             | 12 +++++++-----
 sdks/go/examples/xlang/combine/combine.go                   | 12 +++++++-----
 sdks/go/examples/xlang/combine_globally/combine_globally.go |  3 ++-
 sdks/go/examples/xlang/flatten/flatten.go                   |  3 ++-
 sdks/go/examples/xlang/group_by/group_by.go                 | 12 +++++++-----
 sdks/go/examples/xlang/partition/partition.go               |  3 ++-
 sdks/go/examples/xlang/wordcount/wordcount.go               |  7 +++++--
 8 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go b/sdks/go/examples/xlang/bigquery/wordcount.go
index 9500d2ceb5d..5a30423be88 100644
--- a/sdks/go/examples/xlang/bigquery/wordcount.go
+++ b/sdks/go/examples/xlang/bigquery/wordcount.go
@@ -96,6 +96,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 )
 
@@ -110,10 +111,7 @@ var (
 )
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil)))
-	beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil)))
-	beam.RegisterType(reflect.TypeOf((*CountsRow)(nil)))
-	beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil)))
+	register.Combiner3[WordsAccum, ShakesRow, CountsRow](&WordsCombine{})
 }
 
 // ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
diff --git a/sdks/go/examples/xlang/cogroup_by/cogroup_by.go b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
index 9a98c4ae97c..717139af0cf 100644
--- a/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
+++ b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
@@ -27,11 +27,11 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"reflect"
 	"sort"
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -72,10 +72,12 @@ func sumCounts(key int64, iter1 func(*string) bool) (int64, []string) {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
-	beam.RegisterFunction(formatFn)
-	beam.RegisterFunction(getKV)
-	beam.RegisterFunction(sumCounts)
+	register.Function2x1(formatFn)
+	register.Function2x0(getKV)
+	register.Function2x2(sumCounts)
+
+	register.Emitter2[int64, string]()
+	register.Iter1[string]()
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/combine/combine.go b/sdks/go/examples/xlang/combine/combine.go
index d522dfaa130..b8588fc2817 100644
--- a/sdks/go/examples/xlang/combine/combine.go
+++ b/sdks/go/examples/xlang/combine/combine.go
@@ -27,10 +27,10 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -68,10 +68,12 @@ func sumCounts(key string, iter func(*int64) bool) (string, int64) {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
-	beam.RegisterFunction(formatFn)
-	beam.RegisterFunction(getKV)
-	beam.RegisterFunction(sumCounts)
+	register.Function2x1(formatFn)
+	register.Function2x0(getKV)
+	register.Function2x2(sumCounts)
+
+	register.Emitter2[string, int64]()
+	register.Iter1[int64]()
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/combine_globally/combine_globally.go b/sdks/go/examples/xlang/combine_globally/combine_globally.go
index cdb9a6977aa..942e562e1ce 100644
--- a/sdks/go/examples/xlang/combine_globally/combine_globally.go
+++ b/sdks/go/examples/xlang/combine_globally/combine_globally.go
@@ -30,6 +30,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
 }
 
 func init() {
-	beam.RegisterFunction(formatFn)
+	register.Function1x1(formatFn)
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/flatten/flatten.go b/sdks/go/examples/xlang/flatten/flatten.go
index ab3818187bc..9a4d5475956 100644
--- a/sdks/go/examples/xlang/flatten/flatten.go
+++ b/sdks/go/examples/xlang/flatten/flatten.go
@@ -30,6 +30,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
 }
 
 func init() {
-	beam.RegisterFunction(formatFn)
+	register.Function1x1(formatFn)
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/group_by/group_by.go b/sdks/go/examples/xlang/group_by/group_by.go
index 6748fc26105..c052c6748ba 100644
--- a/sdks/go/examples/xlang/group_by/group_by.go
+++ b/sdks/go/examples/xlang/group_by/group_by.go
@@ -27,11 +27,11 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"reflect"
 	"sort"
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -71,10 +71,12 @@ func collectValues(key string, iter func(*int64) bool) (string, []int) {
 }
 
 func init() {
-	beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
-	beam.RegisterFunction(formatFn)
-	beam.RegisterFunction(getKV)
-	beam.RegisterFunction(collectValues)
+	register.Function2x1(formatFn)
+	register.Function2x0(getKV)
+	register.Function2x2(collectValues)
+
+	register.Emitter2[string, int64]()
+	register.Iter1[int64]()
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/partition/partition.go b/sdks/go/examples/xlang/partition/partition.go
index 65a0b3aa3a8..ac77aa09391 100644
--- a/sdks/go/examples/xlang/partition/partition.go
+++ b/sdks/go/examples/xlang/partition/partition.go
@@ -30,6 +30,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
 }
 
 func init() {
-	beam.RegisterFunction(formatFn)
+	register.Function1x1(formatFn)
 }
 
 func main() {
diff --git a/sdks/go/examples/xlang/wordcount/wordcount.go b/sdks/go/examples/xlang/wordcount/wordcount.go
index a73dff52052..5d9eb11f810 100644
--- a/sdks/go/examples/xlang/wordcount/wordcount.go
+++ b/sdks/go/examples/xlang/wordcount/wordcount.go
@@ -33,6 +33,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/examples/xlang"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
 
@@ -69,8 +70,10 @@ func formatFn(w string, c int64) string {
 }
 
 func init() {
-	beam.RegisterFunction(extractFn)
-	beam.RegisterFunction(formatFn)
+	register.Function3x0(extractFn)
+	register.Function2x1(formatFn)
+
+	register.Emitter1[string]()
 }
 
 func main() {