You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/27 17:37:49 UTC

[beam] branch master updated: [Go SDK] Fix multimap support for the direct runner (#24775)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 96f4391cc61 [Go SDK] Fix multimap support for the direct runner (#24775)
96f4391cc61 is described below

commit 96f4391cc614364b3432b2287e97dd103d23469d
Author: camphillips22 <ca...@fullstory.com>
AuthorDate: Tue Dec 27 12:37:40 2022 -0500

    [Go SDK] Fix multimap support for the direct runner (#24775)
    
    Updates the `NewKeyedIterable()` implementation to correctly
    parse a multimap side input from an in memory ReStream.
---
 sdks/go/pkg/beam/runners/direct/buffer.go      | 17 ++++++++++++++-
 sdks/go/pkg/beam/runners/direct/direct_test.go | 30 ++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go
index e831930a618..383db3db306 100644
--- a/sdks/go/pkg/beam/runners/direct/buffer.go
+++ b/sdks/go/pkg/beam/runners/direct/buffer.go
@@ -72,7 +72,22 @@ func (n *buffer) NewIterable(ctx context.Context, reader exec.StateReader, w typ
 }
 
 func (n *buffer) NewKeyedIterable(ctx context.Context, reader exec.StateReader, w typex.Window, iterKey any) (exec.ReStream, error) {
-	return n.NewIterable(ctx, reader, w)
+	if !n.done {
+		panic(fmt.Sprintf("buffer[%v] incomplete: %v", n.uid, len(n.buf)))
+	}
+	s := &exec.FixedReStream{Buf: make([]exec.FullValue, 0)}
+	for _, v := range n.buf {
+		if v.Elm == iterKey {
+			s.Buf = append(s.Buf, exec.FullValue{
+				Elm:          v.Elm2,
+				Timestamp:    v.Timestamp,
+				Windows:      v.Windows,
+				Pane:         v.Pane,
+				Continuation: v.Continuation,
+			})
+		}
+	}
+	return s, nil
 }
 
 func (n *buffer) String() string {
diff --git a/sdks/go/pkg/beam/runners/direct/direct_test.go b/sdks/go/pkg/beam/runners/direct/direct_test.go
index ac1eeecb64b..a8108580aa2 100644
--- a/sdks/go/pkg/beam/runners/direct/direct_test.go
+++ b/sdks/go/pkg/beam/runners/direct/direct_test.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
 	"github.com/google/go-cmp/cmp"
 )
 
@@ -44,6 +45,7 @@ func init() {
 	beam.RegisterFunction(dofn2x1)
 	beam.RegisterFunction(dofn3x1)
 	beam.RegisterFunction(dofn2x2KV)
+	beam.RegisterFunction(dofnMultiMap)
 	beam.RegisterFunction(dofn2)
 	beam.RegisterFunction(dofnKV)
 	beam.RegisterFunction(dofnKV2)
@@ -121,6 +123,16 @@ func dofn2x2KV(imp []byte, iter func(*string, *int64) bool, emitK func(string),
 	emitV(sum)
 }
 
+func dofnMultiMap(key string, lookup func(string) func(*int64) bool, emitK func(string), emitV func(int64)) {
+	var v, sum int64
+	iter := lookup(key)
+	for iter(&v) {
+		sum += v
+	}
+	emitK(key)
+	emitV(sum)
+}
+
 // int64Check validates that within a single bundle,
 // we received the expected int64 values.
 type int64Check struct {
@@ -446,6 +458,24 @@ func TestRunner_Pipelines(t *testing.T) {
 			t.Fatal(err)
 		}
 	})
+	t.Run("sideinput_multimap", func(t *testing.T) {
+		p, s := beam.NewPipelineWithRoot()
+		imp := beam.Impulse(s)
+		col1 := beam.ParDo(s, dofnKV, imp)
+		keys := filter.Distinct(s, beam.DropValue(s, col1))
+		ks, sum := beam.ParDo2(s, dofnMultiMap, keys, beam.SideInput{Input: col1})
+		beam.ParDo(s, &stringCheck{
+			Name: "iterKV sideinput check K",
+			Want: []string{"a", "b"},
+		}, ks)
+		beam.ParDo(s, &int64Check{
+			Name: "iterKV sideinput check V",
+			Want: []int{9, 12},
+		}, sum)
+		if _, err := executeWithT(context.Background(), t, p); err != nil {
+			t.Fatal(err)
+		}
+	})
 	// Validates the waiting on side input readiness in buffer.
 	t.Run("sideinput_2iterable", func(t *testing.T) {
 		p, s := beam.NewPipelineWithRoot()