You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/27 17:37:49 UTC
[beam] branch master updated: [Go SDK] Fix multimap support for the direct runner (#24775)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 96f4391cc61 [Go SDK] Fix multimap support for the direct runner (#24775)
96f4391cc61 is described below
commit 96f4391cc614364b3432b2287e97dd103d23469d
Author: camphillips22 <ca...@fullstory.com>
AuthorDate: Tue Dec 27 12:37:40 2022 -0500
[Go SDK] Fix multimap support for the direct runner (#24775)
Updates the `NewKeyedIterable()` implementation to correctly
parse a multimap side input from an in memory ReStream.
---
sdks/go/pkg/beam/runners/direct/buffer.go | 17 ++++++++++++++-
sdks/go/pkg/beam/runners/direct/direct_test.go | 30 ++++++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go
index e831930a618..383db3db306 100644
--- a/sdks/go/pkg/beam/runners/direct/buffer.go
+++ b/sdks/go/pkg/beam/runners/direct/buffer.go
@@ -72,7 +72,22 @@ func (n *buffer) NewIterable(ctx context.Context, reader exec.StateReader, w typ
}
func (n *buffer) NewKeyedIterable(ctx context.Context, reader exec.StateReader, w typex.Window, iterKey any) (exec.ReStream, error) {
- return n.NewIterable(ctx, reader, w)
+ if !n.done {
+ panic(fmt.Sprintf("buffer[%v] incomplete: %v", n.uid, len(n.buf)))
+ }
+ s := &exec.FixedReStream{Buf: make([]exec.FullValue, 0)}
+ for _, v := range n.buf {
+ if v.Elm == iterKey {
+ s.Buf = append(s.Buf, exec.FullValue{
+ Elm: v.Elm2,
+ Timestamp: v.Timestamp,
+ Windows: v.Windows,
+ Pane: v.Pane,
+ Continuation: v.Continuation,
+ })
+ }
+ }
+ return s, nil
}
func (n *buffer) String() string {
diff --git a/sdks/go/pkg/beam/runners/direct/direct_test.go b/sdks/go/pkg/beam/runners/direct/direct_test.go
index ac1eeecb64b..a8108580aa2 100644
--- a/sdks/go/pkg/beam/runners/direct/direct_test.go
+++ b/sdks/go/pkg/beam/runners/direct/direct_test.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
"github.com/google/go-cmp/cmp"
)
@@ -44,6 +45,7 @@ func init() {
beam.RegisterFunction(dofn2x1)
beam.RegisterFunction(dofn3x1)
beam.RegisterFunction(dofn2x2KV)
+ beam.RegisterFunction(dofnMultiMap)
beam.RegisterFunction(dofn2)
beam.RegisterFunction(dofnKV)
beam.RegisterFunction(dofnKV2)
@@ -121,6 +123,16 @@ func dofn2x2KV(imp []byte, iter func(*string, *int64) bool, emitK func(string),
emitV(sum)
}
+func dofnMultiMap(key string, lookup func(string) func(*int64) bool, emitK func(string), emitV func(int64)) {
+ var v, sum int64
+ iter := lookup(key)
+ for iter(&v) {
+ sum += v
+ }
+ emitK(key)
+ emitV(sum)
+}
+
// int64Check validates that within a single bundle,
// we received the expected int64 values.
type int64Check struct {
@@ -446,6 +458,24 @@ func TestRunner_Pipelines(t *testing.T) {
t.Fatal(err)
}
})
+ t.Run("sideinput_multimap", func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ imp := beam.Impulse(s)
+ col1 := beam.ParDo(s, dofnKV, imp)
+ keys := filter.Distinct(s, beam.DropValue(s, col1))
+ ks, sum := beam.ParDo2(s, dofnMultiMap, keys, beam.SideInput{Input: col1})
+ beam.ParDo(s, &stringCheck{
+ Name: "iterKV sideinput check K",
+ Want: []string{"a", "b"},
+ }, ks)
+ beam.ParDo(s, &int64Check{
+ Name: "iterKV sideinput check V",
+ Want: []int{9, 12},
+ }, sum)
+ if _, err := executeWithT(context.Background(), t, p); err != nil {
+ t.Fatal(err)
+ }
+ })
// Validates the waiting on side input readiness in buffer.
t.Run("sideinput_2iterable", func(t *testing.T) {
p, s := beam.NewPipelineWithRoot()