You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/22 18:24:54 UTC

[beam] branch master updated: [BEAM-4472] Fix Top accum coder to be liftable.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d98eabf  [BEAM-4472] Fix Top accum coder to be liftable.
     new 493282a  Merge pull request #7586 from lostluck/topcoder
d98eabf is described below

commit d98eabf3a4d8ad59491b68491bea460c7f3de170
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Tue Jan 22 18:02:06 2019 +0000

    [BEAM-4472] Fix Top accum coder to be liftable.
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go | 18 -----
 sdks/go/pkg/beam/transforms/top/top.go            | 85 ++++++++++++++++++-----
 sdks/go/pkg/beam/transforms/top/top.shims.go      | 56 +++++++--------
 sdks/go/pkg/beam/transforms/top/top_test.go       | 55 ++++++++++-----
 4 files changed, 134 insertions(+), 80 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 9d6bb34..8e22228 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -158,9 +158,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
 	}
 
 	edge := s.Edges[1].Edge
-	if !tryAddingCoder(edge.AccumCoder) {
-		return
-	}
 	acID := m.coders.Add(edge.AccumCoder)
 	payload := &pb.CombinePayload{
 		CombineFn: &pb.SdkFunctionSpec{
@@ -175,21 +172,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
 	transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
 }
 
-// If the accumulator type is unencodable (eg. contains raw interface{})
-// Try encoding the AccumCoder. If the marshaller doesn't panic, it's
-// encodable.
-func tryAddingCoder(c *coder.Coder) (ok bool) {
-	defer func() {
-		if p := recover(); p != nil {
-			ok = false
-			fmt.Printf("Unable to encode combiner for lifting: %v", p)
-		}
-	}()
-	// Try in a new Marshaller to not corrupt state.
-	NewCoderMarshaller().Add(c)
-	return true
-}
-
 func (m *marshaller) addMultiEdge(edge NamedEdge) string {
 	id := edgeID(edge.Edge)
 	if _, exists := m.transforms[id]; exists {
diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go
index 3c6e04b..8b7edfd 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -18,6 +18,8 @@
 package top
 
 import (
+	"bytes"
+	"encoding/json"
 	"fmt"
 	"sort"
 
@@ -50,7 +52,7 @@ func Largest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.P
 	t := beam.ValidateNonCompositeType(col)
 	validate(t, n, less)
 
-	return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n}, col)
+	return beam.Combine(s, newCombineFn(less, n, t, false), col)
 }
 
 // LargestPerKey returns the largest N values for each key of a PCollection<KV<K,T>>.
@@ -63,7 +65,7 @@ func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{})
 	_, t := beam.ValidateKVType(col)
 	validate(t, n, less)
 
-	return beam.CombinePerKey(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n}, col)
+	return beam.CombinePerKey(s, newCombineFn(less, n, t, false), col)
 }
 
 // Smallest returns the smallest N elements of a PCollection<T>. The order is
@@ -81,7 +83,7 @@ func Smallest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.
 	t := beam.ValidateNonCompositeType(col)
 	validate(t, n, less)
 
-	return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Reversed: true}, col)
+	return beam.Combine(s, newCombineFn(less, n, t, true), col)
 }
 
 // SmallestPerKey returns the smallest N values for each key of a PCollection<KV<K,T>>.
@@ -94,7 +96,7 @@ func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{})
 	_, t := beam.ValidateKVType(col)
 	validate(t, n, less)
 
-	return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Reversed: true}, col)
+	return beam.Combine(s, newCombineFn(less, n, t, true), col)
 }
 
 func validate(t typex.FullType, n int, less interface{}) {
@@ -104,17 +106,61 @@ func validate(t typex.FullType, n int, less interface{}) {
 	funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type()))
 }
 
-// TODO(herohde) 5/25/2017: BEAM-4472 the accumulator should be serializable
-// with a Coder. We need a coder here, because the elements are generally
-// code-able only. Until then, it does not support combiner lifting.
+func newCombineFn(less interface{}, n int, t typex.FullType, reversed bool) *combineFn {
+	coder := beam.NewCoder(t)
+	return &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Coder: beam.EncodedCoder{Coder: coder}, Reversed: reversed}
+}
 
 // TODO(herohde) 5/25/2017: use a heap instead of a sorted slice.
 
 type accum struct {
+	coder beam.Coder
+	data  [][]byte
 	// list stores the elements of type A in order. It has at most size N.
 	list []interface{}
 }
 
+// UnmarshalJSON allows accum to hook into the JSON Decoder, and
+// deserialize it's own representation.
+func (a *accum) UnmarshalJSON(b []byte) error {
+	json.Unmarshal(b, &a.data)
+	return nil
+}
+
+func (a *accum) unmarshal() error {
+	if a.data == nil {
+		return nil
+	}
+	dec := exec.MakeElementDecoder(beam.UnwrapCoder(a.coder))
+	for _, val := range a.data {
+		fv, err := dec.Decode(bytes.NewBuffer(val))
+		if err != nil {
+			return fmt.Errorf("top.accum: error unmarshal: %v", err)
+		}
+		a.list = append(a.list, fv.Elm)
+	}
+	a.data = nil
+	return nil
+}
+
+// MarshalJSON uses the hook into the JSON encoder library to
+func (a accum) MarshalJSON() ([]byte, error) {
+	if !a.coder.IsValid() {
+		return nil, fmt.Errorf("top.accum: element coder unspecified")
+	}
+	enc := exec.MakeElementEncoder(beam.UnwrapCoder(a.coder))
+	var values [][]byte
+	for _, value := range a.list {
+		var buf bytes.Buffer
+		if err := enc.Encode(exec.FullValue{Elm: value}, &buf); err != nil {
+			return nil, fmt.Errorf("top.accum: marshalling of %v failed: %v", value, err)
+		}
+		values = append(values, buf.Bytes())
+	}
+	a.list = nil
+	return json.Marshal(values)
+}
+
 // combineFn is the internal CombineFn. It maintains accumulators containing
 // sorted lists of element of the underlying type, A, up to size N, under the
 // Less ordering on A. The natural order maintains the largest elements.
@@ -125,27 +171,32 @@ type combineFn struct {
 	Reversed bool `json:"reversed"`
 	// N is the number of elements to keep.
 	N int `json:"n"`
+	// Coder is the element coder for the underlying type, A.
+	Coder beam.EncodedCoder `json:"coder"`
 
 	less reflectx.Func2x1
 }
 
-// TODO(herohde) 5/25/2017: a Setup/Init method would be useful.
-
 func (f *combineFn) CreateAccumulator() accum {
-	return accum{}
+	return accum{coder: f.Coder.Coder}
 }
 
 func (f *combineFn) AddInput(a accum, val beam.T) accum {
-	t := f.Less.Fn.Type().In(0)                 // == underlying type, A
-	ret := append(a.list, exec.Convert(val, t)) // unwrap T
+	ret := append(a.list, val)
 	return f.trim(ret)
 }
 
-func (f *combineFn) MergeAccumulators(list []accum) accum {
-	var ret []interface{}
-	for _, a := range list {
-		ret = append(ret, a.list...)
+func (f *combineFn) MergeAccumulators(a, b accum) accum {
+	a.coder = f.Coder.Coder
+	b.coder = f.Coder.Coder
+	if err := a.unmarshal(); err != nil {
+		panic(err)
+	}
+	if err := b.unmarshal(); err != nil {
+		panic(err)
 	}
+	var ret []interface{}
+	ret = append(a.list, b.list...)
 	return f.trim(ret)
 }
 
@@ -174,5 +225,5 @@ func (f *combineFn) trim(ret []interface{}) accum {
 	if len(ret) > f.N {
 		ret = ret[:f.N]
 	}
-	return accum{list: ret}
+	return accum{coder: f.Coder.Coder, list: ret}
 }
diff --git a/sdks/go/pkg/beam/transforms/top/top.shims.go b/sdks/go/pkg/beam/transforms/top/top.shims.go
index 9228104..8031bda 100644
--- a/sdks/go/pkg/beam/transforms/top/top.shims.go
+++ b/sdks/go/pkg/beam/transforms/top/top.shims.go
@@ -32,9 +32,9 @@ func init() {
 	runtime.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
 	runtime.RegisterType(reflect.TypeOf((*typex.T)(nil)).Elem())
 	reflectx.RegisterStructWrapper(reflect.TypeOf((*combineFn)(nil)).Elem(), wrapMakerCombineFn)
+	reflectx.RegisterFunc(reflect.TypeOf((*func(accum,accum) (accum))(nil)).Elem(), funcMakerAccumAccumГAccum)
 	reflectx.RegisterFunc(reflect.TypeOf((*func(accum,typex.T) (accum))(nil)).Elem(), funcMakerAccumTypex۰TГAccum)
 	reflectx.RegisterFunc(reflect.TypeOf((*func(accum) ([]typex.T))(nil)).Elem(), funcMakerAccumГSliceofTypex۰T)
-	reflectx.RegisterFunc(reflect.TypeOf((*func([]accum) (accum))(nil)).Elem(), funcMakerSliceofAccumГAccum)
 	reflectx.RegisterFunc(reflect.TypeOf((*func() (accum))(nil)).Elem(), funcMakerГAccum)
 }
 
@@ -44,10 +44,36 @@ func wrapMakerCombineFn(fn interface{}) map[string]reflectx.Func {
 		"AddInput": reflectx.MakeFunc(func(a0 accum, a1 typex.T) (accum) { return dfn.AddInput(a0, a1) }),
 		"CreateAccumulator": reflectx.MakeFunc(func() (accum) { return dfn.CreateAccumulator() }),
 		"ExtractOutput": reflectx.MakeFunc(func(a0 accum) ([]typex.T) { return dfn.ExtractOutput(a0) }),
-		"MergeAccumulators": reflectx.MakeFunc(func(a0 []accum) (accum) { return dfn.MergeAccumulators(a0) }),
+		"MergeAccumulators": reflectx.MakeFunc(func(a0 accum, a1 accum) (accum) { return dfn.MergeAccumulators(a0, a1) }),
 	}
 }
 
+type callerAccumAccumГAccum struct {
+	fn func(accum,accum) (accum)
+}
+
+func funcMakerAccumAccumГAccum(fn interface{}) reflectx.Func {
+	f := fn.(func(accum,accum) (accum))
+	return &callerAccumAccumГAccum{fn: f}
+}
+
+func (c *callerAccumAccumГAccum) Name() string {
+	return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerAccumAccumГAccum) Type() reflect.Type {
+	return reflect.TypeOf(c.fn)
+}
+
+func (c *callerAccumAccumГAccum) Call(args []interface{}) []interface{} {
+	out0 := c.fn(args[0].(accum), args[1].(accum))
+	return []interface{}{out0}
+}
+
+func (c *callerAccumAccumГAccum) Call2x1(arg0, arg1 interface{}) (interface{}) {
+	return c.fn(arg0.(accum), arg1.(accum))
+}
+
 type callerAccumTypex۰TГAccum struct {
 	fn func(accum,typex.T) (accum)
 }
@@ -100,32 +126,6 @@ func (c *callerAccumГSliceofTypex۰T) Call1x1(arg0 interface{}) (interface{}) {
 	return c.fn(arg0.(accum))
 }
 
-type callerSliceofAccumГAccum struct {
-	fn func([]accum) (accum)
-}
-
-func funcMakerSliceofAccumГAccum(fn interface{}) reflectx.Func {
-	f := fn.(func([]accum) (accum))
-	return &callerSliceofAccumГAccum{fn: f}
-}
-
-func (c *callerSliceofAccumГAccum) Name() string {
-	return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerSliceofAccumГAccum) Type() reflect.Type {
-	return reflect.TypeOf(c.fn)
-}
-
-func (c *callerSliceofAccumГAccum) Call(args []interface{}) []interface{} {
-	out0 := c.fn(args[0].([]accum))
-	return []interface{}{out0}
-}
-
-func (c *callerSliceofAccumГAccum) Call1x1(arg0 interface{}) (interface{}) {
-	return c.fn(arg0.([]accum))
-}
-
 type callerГAccum struct {
 	fn func() (accum)
 }
diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go
index 178965c..76130ec 100644
--- a/sdks/go/pkg/beam/transforms/top/top_test.go
+++ b/sdks/go/pkg/beam/transforms/top/top_test.go
@@ -16,10 +16,12 @@
 package top
 
 import (
+	"fmt"
 	"reflect"
 	"testing"
 
-	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
@@ -29,7 +31,7 @@ func TestCombineFn3String(t *testing.T) {
 	less := func(a, b string) bool {
 		return len(a) < len(b)
 	}
-	fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}}
+	fn := newCombineFn(less, 3, typex.New(reflectx.String), false)
 
 	tests := []struct {
 		Elms     []string
@@ -52,12 +54,12 @@ func TestCombineFn3String(t *testing.T) {
 }
 
 // TestCombineFn3RevString verifies that the accumulator correctly
-// maintains the top 3 shorest strings.
+// maintains the top 3 shortest strings.
 func TestCombineFn3RevString(t *testing.T) {
 	less := func(a, b string) bool {
 		return len(a) < len(b)
 	}
-	fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, Reversed: true}
+	fn := newCombineFn(less, 3, typex.New(reflectx.String), true)
 
 	tests := []struct {
 		Elms     []string
@@ -84,28 +86,30 @@ func TestCombineFnMerge(t *testing.T) {
 	less := func(a, b string) bool {
 		return len(a) < len(b)
 	}
-	fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}}
-
+	fn := newCombineFn(less, 3, typex.New(reflectx.String), false)
 	tests := []struct {
 		Elms     [][]string
 		Expected []string
 	}{
 		{[][]string{nil}, nil},
 		{[][]string{{"foo"}}, []string{"foo"}},
-		{[][]string{{"1", "2"}, {"3"}, {"4", "5"}}, []string{"1", "2", "3"}},
+		{[][]string{{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, []string{"1", "2", "3"}},
 		{[][]string{{"a1"}, {"b22", "c22"}, {"d333"}, {"e22"}}, []string{"d333", "b22", "c22"}},
+		{[][]string{{"a55555"}, {"b22", "c4444"}, {"d333"}, {"e22"}}, []string{"a55555", "c4444", "d333"}},
 	}
 
-	for _, test := range tests {
-		var list []accum
-		for _, a := range test.Elms {
-			list = append(list, load(fn, a...))
-		}
-
-		actual := output(fn, fn.MergeAccumulators(list))
-		if !reflect.DeepEqual(actual, test.Expected) {
-			t.Errorf("CombineFn(3; %v) = %v, want %v", test.Elms, actual, test.Expected)
-		}
+	for i, test := range tests {
+		t.Run(fmt.Sprintf("%02d", i), func(t *testing.T) {
+			var list []accum
+			for _, a := range test.Elms {
+				list = append(list, load(fn, a...))
+			}
+			a := merge(t, fn, list...)
+			actual := output(fn, a)
+			if !reflect.DeepEqual(actual, test.Expected) {
+				t.Errorf("CombineFn(3; %v) = %v, want %v", test.Elms, actual, test.Expected)
+			}
+		})
 	}
 }
 
@@ -117,6 +121,23 @@ func load(fn *combineFn, elms ...string) accum {
 	return a
 }
 
+func merge(t *testing.T, fn *combineFn, as ...accum) accum {
+	t.Helper()
+	a := fn.CreateAccumulator()
+	for i, b := range as {
+		buf, err := b.MarshalJSON()
+		if err != nil {
+			t.Fatalf("failure marshalling accum[%d]: %v, %+v", i, err, b)
+		}
+		var c accum
+		if err := c.UnmarshalJSON(buf); err != nil {
+			t.Fatalf("failure unmarshalling accum[%d]: %v, %+v", i, err, b)
+		}
+		a = fn.MergeAccumulators(a, c)
+	}
+	return a
+}
+
 func output(fn *combineFn, a accum) []string {
 	var ret []string
 	for _, actual := range fn.ExtractOutput(a) {