You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/07/13 01:05:32 UTC
[beam] branch master updated: Move Xlang Go examples to generic registration (#22249)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2c8e7eb7a39 Move Xlang Go examples to generic registration (#22249)
2c8e7eb7a39 is described below
commit 2c8e7eb7a39cbe3a1678a5c6b8b3f8700d4d8706
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Tue Jul 12 21:05:22 2022 -0400
Move Xlang Go examples to generic registration (#22249)
* WC generics
* Partition generics
* GroupBy generics
* Flatten generics
* combine_globally generics
* Combine generics
* CoGBK generics
* BigQuery generics
* Remove old versions
---
sdks/go/examples/xlang/bigquery/wordcount.go | 6 ++----
sdks/go/examples/xlang/cogroup_by/cogroup_by.go | 12 +++++++-----
sdks/go/examples/xlang/combine/combine.go | 12 +++++++-----
sdks/go/examples/xlang/combine_globally/combine_globally.go | 3 ++-
sdks/go/examples/xlang/flatten/flatten.go | 3 ++-
sdks/go/examples/xlang/group_by/group_by.go | 12 +++++++-----
sdks/go/examples/xlang/partition/partition.go | 3 ++-
sdks/go/examples/xlang/wordcount/wordcount.go | 7 +++++--
8 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/sdks/go/examples/xlang/bigquery/wordcount.go b/sdks/go/examples/xlang/bigquery/wordcount.go
index 9500d2ceb5d..5a30423be88 100644
--- a/sdks/go/examples/xlang/bigquery/wordcount.go
+++ b/sdks/go/examples/xlang/bigquery/wordcount.go
@@ -96,6 +96,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
@@ -110,10 +111,7 @@ var (
)
func init() {
- beam.RegisterType(reflect.TypeOf((*ShakesRow)(nil)))
- beam.RegisterType(reflect.TypeOf((*WordsCombine)(nil)))
- beam.RegisterType(reflect.TypeOf((*CountsRow)(nil)))
- beam.RegisterType(reflect.TypeOf((*WordsAccum)(nil)))
+ register.Combiner3[WordsAccum, ShakesRow, CountsRow](&WordsCombine{})
}
// ShakesRow is a struct corresponding to the schema of the Shakespeare input table. In order to
diff --git a/sdks/go/examples/xlang/cogroup_by/cogroup_by.go b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
index 9a98c4ae97c..717139af0cf 100644
--- a/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
+++ b/sdks/go/examples/xlang/cogroup_by/cogroup_by.go
@@ -27,11 +27,11 @@ import (
"flag"
"fmt"
"log"
- "reflect"
"sort"
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -72,10 +72,12 @@ func sumCounts(key int64, iter1 func(*string) bool) (int64, []string) {
}
func init() {
- beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
- beam.RegisterFunction(formatFn)
- beam.RegisterFunction(getKV)
- beam.RegisterFunction(sumCounts)
+ register.Function2x1(formatFn)
+ register.Function2x0(getKV)
+ register.Function2x2(sumCounts)
+
+ register.Emitter2[int64, string]()
+ register.Iter1[string]()
}
func main() {
diff --git a/sdks/go/examples/xlang/combine/combine.go b/sdks/go/examples/xlang/combine/combine.go
index d522dfaa130..b8588fc2817 100644
--- a/sdks/go/examples/xlang/combine/combine.go
+++ b/sdks/go/examples/xlang/combine/combine.go
@@ -27,10 +27,10 @@ import (
"flag"
"fmt"
"log"
- "reflect"
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -68,10 +68,12 @@ func sumCounts(key string, iter func(*int64) bool) (string, int64) {
}
func init() {
- beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
- beam.RegisterFunction(formatFn)
- beam.RegisterFunction(getKV)
- beam.RegisterFunction(sumCounts)
+ register.Function2x1(formatFn)
+ register.Function2x0(getKV)
+ register.Function2x2(sumCounts)
+
+ register.Emitter2[string, int64]()
+ register.Iter1[int64]()
}
func main() {
diff --git a/sdks/go/examples/xlang/combine_globally/combine_globally.go b/sdks/go/examples/xlang/combine_globally/combine_globally.go
index cdb9a6977aa..942e562e1ce 100644
--- a/sdks/go/examples/xlang/combine_globally/combine_globally.go
+++ b/sdks/go/examples/xlang/combine_globally/combine_globally.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
}
func init() {
- beam.RegisterFunction(formatFn)
+ register.Function1x1(formatFn)
}
func main() {
diff --git a/sdks/go/examples/xlang/flatten/flatten.go b/sdks/go/examples/xlang/flatten/flatten.go
index ab3818187bc..9a4d5475956 100644
--- a/sdks/go/examples/xlang/flatten/flatten.go
+++ b/sdks/go/examples/xlang/flatten/flatten.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
}
func init() {
- beam.RegisterFunction(formatFn)
+ register.Function1x1(formatFn)
}
func main() {
diff --git a/sdks/go/examples/xlang/group_by/group_by.go b/sdks/go/examples/xlang/group_by/group_by.go
index 6748fc26105..c052c6748ba 100644
--- a/sdks/go/examples/xlang/group_by/group_by.go
+++ b/sdks/go/examples/xlang/group_by/group_by.go
@@ -27,11 +27,11 @@ import (
"flag"
"fmt"
"log"
- "reflect"
"sort"
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -71,10 +71,12 @@ func collectValues(key string, iter func(*int64) bool) (string, []int) {
}
func init() {
- beam.RegisterType(reflect.TypeOf((*KV)(nil)).Elem())
- beam.RegisterFunction(formatFn)
- beam.RegisterFunction(getKV)
- beam.RegisterFunction(collectValues)
+ register.Function2x1(formatFn)
+ register.Function2x0(getKV)
+ register.Function2x2(collectValues)
+
+ register.Emitter2[string, int64]()
+ register.Iter1[int64]()
}
func main() {
diff --git a/sdks/go/examples/xlang/partition/partition.go b/sdks/go/examples/xlang/partition/partition.go
index 65a0b3aa3a8..ac77aa09391 100644
--- a/sdks/go/examples/xlang/partition/partition.go
+++ b/sdks/go/examples/xlang/partition/partition.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -49,7 +50,7 @@ func formatFn(c int64) string {
}
func init() {
- beam.RegisterFunction(formatFn)
+ register.Function1x1(formatFn)
}
func main() {
diff --git a/sdks/go/examples/xlang/wordcount/wordcount.go b/sdks/go/examples/xlang/wordcount/wordcount.go
index a73dff52052..5d9eb11f810 100644
--- a/sdks/go/examples/xlang/wordcount/wordcount.go
+++ b/sdks/go/examples/xlang/wordcount/wordcount.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/examples/xlang"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
@@ -69,8 +70,10 @@ func formatFn(w string, c int64) string {
}
func init() {
- beam.RegisterFunction(extractFn)
- beam.RegisterFunction(formatFn)
+ register.Function3x0(extractFn)
+ register.Function2x1(formatFn)
+
+ register.Emitter1[string]()
}
func main() {