You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/22 18:24:54 UTC
[beam] branch master updated: [BEAM-4472] Fix Top accum coder to be
liftable.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d98eabf [BEAM-4472] Fix Top accum coder to be liftable.
new 493282a Merge pull request #7586 from lostluck/topcoder
d98eabf is described below
commit d98eabf3a4d8ad59491b68491bea460c7f3de170
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Tue Jan 22 18:02:06 2019 +0000
[BEAM-4472] Fix Top accum coder to be liftable.
---
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 18 -----
sdks/go/pkg/beam/transforms/top/top.go | 85 ++++++++++++++++++-----
sdks/go/pkg/beam/transforms/top/top.shims.go | 56 +++++++--------
sdks/go/pkg/beam/transforms/top/top_test.go | 55 ++++++++++-----
4 files changed, 134 insertions(+), 80 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 9d6bb34..8e22228 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -158,9 +158,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
}
edge := s.Edges[1].Edge
- if !tryAddingCoder(edge.AccumCoder) {
- return
- }
acID := m.coders.Add(edge.AccumCoder)
payload := &pb.CombinePayload{
CombineFn: &pb.SdkFunctionSpec{
@@ -175,21 +172,6 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
}
-// If the accumulator type is unencodable (eg. contains raw interface{})
-// Try encoding the AccumCoder. If the marshaller doesn't panic, it's
-// encodable.
-func tryAddingCoder(c *coder.Coder) (ok bool) {
- defer func() {
- if p := recover(); p != nil {
- ok = false
- fmt.Printf("Unable to encode combiner for lifting: %v", p)
- }
- }()
- // Try in a new Marshaller to not corrupt state.
- NewCoderMarshaller().Add(c)
- return true
-}
-
func (m *marshaller) addMultiEdge(edge NamedEdge) string {
id := edgeID(edge.Edge)
if _, exists := m.transforms[id]; exists {
diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go
index 3c6e04b..8b7edfd 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -18,6 +18,8 @@
package top
import (
+ "bytes"
+ "encoding/json"
"fmt"
"sort"
@@ -50,7 +52,7 @@ func Largest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.P
t := beam.ValidateNonCompositeType(col)
validate(t, n, less)
- return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n}, col)
+ return beam.Combine(s, newCombineFn(less, n, t, false), col)
}
// LargestPerKey returns the largest N values for each key of a PCollection<KV<K,T>>.
@@ -63,7 +65,7 @@ func LargestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{})
_, t := beam.ValidateKVType(col)
validate(t, n, less)
- return beam.CombinePerKey(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n}, col)
+ return beam.CombinePerKey(s, newCombineFn(less, n, t, false), col)
}
// Smallest returns the smallest N elements of a PCollection<T>. The order is
@@ -81,7 +83,7 @@ func Smallest(s beam.Scope, col beam.PCollection, n int, less interface{}) beam.
t := beam.ValidateNonCompositeType(col)
validate(t, n, less)
- return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Reversed: true}, col)
+ return beam.Combine(s, newCombineFn(less, n, t, true), col)
}
// SmallestPerKey returns the smallest N values for each key of a PCollection<KV<K,T>>.
@@ -94,7 +96,7 @@ func SmallestPerKey(s beam.Scope, col beam.PCollection, n int, less interface{})
_, t := beam.ValidateKVType(col)
validate(t, n, less)
- return beam.Combine(s, &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Reversed: true}, col)
+ return beam.Combine(s, newCombineFn(less, n, t, true), col)
}
func validate(t typex.FullType, n int, less interface{}) {
@@ -104,17 +106,61 @@ func validate(t typex.FullType, n int, less interface{}) {
funcx.MustSatisfy(less, funcx.Replace(sig, beam.TType, t.Type()))
}
-// TODO(herohde) 5/25/2017: BEAM-4472 the accumulator should be serializable
-// with a Coder. We need a coder here, because the elements are generally
-// code-able only. Until then, it does not support combiner lifting.
+func newCombineFn(less interface{}, n int, t typex.FullType, reversed bool) *combineFn {
+ coder := beam.NewCoder(t)
+ return &combineFn{Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, N: n, Coder: beam.EncodedCoder{Coder: coder}, Reversed: reversed}
+}
// TODO(herohde) 5/25/2017: use a heap instead of a sorted slice.
type accum struct {
+ coder beam.Coder
+ data [][]byte
// list stores the elements of type A in order. It has at most size N.
list []interface{}
}
+// UnmarshalJSON allows accum to hook into the JSON Decoder, and
+// deserialize it's own representation.
+func (a *accum) UnmarshalJSON(b []byte) error {
+ json.Unmarshal(b, &a.data)
+ return nil
+}
+
+func (a *accum) unmarshal() error {
+ if a.data == nil {
+ return nil
+ }
+ dec := exec.MakeElementDecoder(beam.UnwrapCoder(a.coder))
+ for _, val := range a.data {
+ fv, err := dec.Decode(bytes.NewBuffer(val))
+ if err != nil {
+ return fmt.Errorf("top.accum: error unmarshal: %v", err)
+ }
+ a.list = append(a.list, fv.Elm)
+ }
+ a.data = nil
+ return nil
+}
+
+// MarshalJSON uses the hook into the JSON encoder library to
+func (a accum) MarshalJSON() ([]byte, error) {
+ if !a.coder.IsValid() {
+ return nil, fmt.Errorf("top.accum: element coder unspecified")
+ }
+ enc := exec.MakeElementEncoder(beam.UnwrapCoder(a.coder))
+ var values [][]byte
+ for _, value := range a.list {
+ var buf bytes.Buffer
+ if err := enc.Encode(exec.FullValue{Elm: value}, &buf); err != nil {
+ return nil, fmt.Errorf("top.accum: marshalling of %v failed: %v", value, err)
+ }
+ values = append(values, buf.Bytes())
+ }
+ a.list = nil
+ return json.Marshal(values)
+}
+
// combineFn is the internal CombineFn. It maintains accumulators containing
// sorted lists of element of the underlying type, A, up to size N, under the
// Less ordering on A. The natural order maintains the largest elements.
@@ -125,27 +171,32 @@ type combineFn struct {
Reversed bool `json:"reversed"`
// N is the number of elements to keep.
N int `json:"n"`
+ // Coder is the element coder for the underlying type, A.
+ Coder beam.EncodedCoder `json:"coder"`
less reflectx.Func2x1
}
-// TODO(herohde) 5/25/2017: a Setup/Init method would be useful.
-
func (f *combineFn) CreateAccumulator() accum {
- return accum{}
+ return accum{coder: f.Coder.Coder}
}
func (f *combineFn) AddInput(a accum, val beam.T) accum {
- t := f.Less.Fn.Type().In(0) // == underlying type, A
- ret := append(a.list, exec.Convert(val, t)) // unwrap T
+ ret := append(a.list, val)
return f.trim(ret)
}
-func (f *combineFn) MergeAccumulators(list []accum) accum {
- var ret []interface{}
- for _, a := range list {
- ret = append(ret, a.list...)
+func (f *combineFn) MergeAccumulators(a, b accum) accum {
+ a.coder = f.Coder.Coder
+ b.coder = f.Coder.Coder
+ if err := a.unmarshal(); err != nil {
+ panic(err)
+ }
+ if err := b.unmarshal(); err != nil {
+ panic(err)
}
+ var ret []interface{}
+ ret = append(a.list, b.list...)
return f.trim(ret)
}
@@ -174,5 +225,5 @@ func (f *combineFn) trim(ret []interface{}) accum {
if len(ret) > f.N {
ret = ret[:f.N]
}
- return accum{list: ret}
+ return accum{coder: f.Coder.Coder, list: ret}
}
diff --git a/sdks/go/pkg/beam/transforms/top/top.shims.go b/sdks/go/pkg/beam/transforms/top/top.shims.go
index 9228104..8031bda 100644
--- a/sdks/go/pkg/beam/transforms/top/top.shims.go
+++ b/sdks/go/pkg/beam/transforms/top/top.shims.go
@@ -32,9 +32,9 @@ func init() {
runtime.RegisterType(reflect.TypeOf((*combineFn)(nil)).Elem())
runtime.RegisterType(reflect.TypeOf((*typex.T)(nil)).Elem())
reflectx.RegisterStructWrapper(reflect.TypeOf((*combineFn)(nil)).Elem(), wrapMakerCombineFn)
+ reflectx.RegisterFunc(reflect.TypeOf((*func(accum,accum) (accum))(nil)).Elem(), funcMakerAccumAccumГAccum)
reflectx.RegisterFunc(reflect.TypeOf((*func(accum,typex.T) (accum))(nil)).Elem(), funcMakerAccumTypex۰TГAccum)
reflectx.RegisterFunc(reflect.TypeOf((*func(accum) ([]typex.T))(nil)).Elem(), funcMakerAccumГSliceofTypex۰T)
- reflectx.RegisterFunc(reflect.TypeOf((*func([]accum) (accum))(nil)).Elem(), funcMakerSliceofAccumГAccum)
reflectx.RegisterFunc(reflect.TypeOf((*func() (accum))(nil)).Elem(), funcMakerГAccum)
}
@@ -44,10 +44,36 @@ func wrapMakerCombineFn(fn interface{}) map[string]reflectx.Func {
"AddInput": reflectx.MakeFunc(func(a0 accum, a1 typex.T) (accum) { return dfn.AddInput(a0, a1) }),
"CreateAccumulator": reflectx.MakeFunc(func() (accum) { return dfn.CreateAccumulator() }),
"ExtractOutput": reflectx.MakeFunc(func(a0 accum) ([]typex.T) { return dfn.ExtractOutput(a0) }),
- "MergeAccumulators": reflectx.MakeFunc(func(a0 []accum) (accum) { return dfn.MergeAccumulators(a0) }),
+ "MergeAccumulators": reflectx.MakeFunc(func(a0 accum, a1 accum) (accum) { return dfn.MergeAccumulators(a0, a1) }),
}
}
+type callerAccumAccumГAccum struct {
+ fn func(accum,accum) (accum)
+}
+
+func funcMakerAccumAccumГAccum(fn interface{}) reflectx.Func {
+ f := fn.(func(accum,accum) (accum))
+ return &callerAccumAccumГAccum{fn: f}
+}
+
+func (c *callerAccumAccumГAccum) Name() string {
+ return reflectx.FunctionName(c.fn)
+}
+
+func (c *callerAccumAccumГAccum) Type() reflect.Type {
+ return reflect.TypeOf(c.fn)
+}
+
+func (c *callerAccumAccumГAccum) Call(args []interface{}) []interface{} {
+ out0 := c.fn(args[0].(accum), args[1].(accum))
+ return []interface{}{out0}
+}
+
+func (c *callerAccumAccumГAccum) Call2x1(arg0, arg1 interface{}) (interface{}) {
+ return c.fn(arg0.(accum), arg1.(accum))
+}
+
type callerAccumTypex۰TГAccum struct {
fn func(accum,typex.T) (accum)
}
@@ -100,32 +126,6 @@ func (c *callerAccumГSliceofTypex۰T) Call1x1(arg0 interface{}) (interface{}) {
return c.fn(arg0.(accum))
}
-type callerSliceofAccumГAccum struct {
- fn func([]accum) (accum)
-}
-
-func funcMakerSliceofAccumГAccum(fn interface{}) reflectx.Func {
- f := fn.(func([]accum) (accum))
- return &callerSliceofAccumГAccum{fn: f}
-}
-
-func (c *callerSliceofAccumГAccum) Name() string {
- return reflectx.FunctionName(c.fn)
-}
-
-func (c *callerSliceofAccumГAccum) Type() reflect.Type {
- return reflect.TypeOf(c.fn)
-}
-
-func (c *callerSliceofAccumГAccum) Call(args []interface{}) []interface{} {
- out0 := c.fn(args[0].([]accum))
- return []interface{}{out0}
-}
-
-func (c *callerSliceofAccumГAccum) Call1x1(arg0 interface{}) (interface{}) {
- return c.fn(arg0.([]accum))
-}
-
type callerГAccum struct {
fn func() (accum)
}
diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go
index 178965c..76130ec 100644
--- a/sdks/go/pkg/beam/transforms/top/top_test.go
+++ b/sdks/go/pkg/beam/transforms/top/top_test.go
@@ -16,10 +16,12 @@
package top
import (
+ "fmt"
"reflect"
"testing"
- "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
@@ -29,7 +31,7 @@ func TestCombineFn3String(t *testing.T) {
less := func(a, b string) bool {
return len(a) < len(b)
}
- fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}}
+ fn := newCombineFn(less, 3, typex.New(reflectx.String), false)
tests := []struct {
Elms []string
@@ -52,12 +54,12 @@ func TestCombineFn3String(t *testing.T) {
}
// TestCombineFn3RevString verifies that the accumulator correctly
-// maintains the top 3 shorest strings.
+// maintains the top 3 shortest strings.
func TestCombineFn3RevString(t *testing.T) {
less := func(a, b string) bool {
return len(a) < len(b)
}
- fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}, Reversed: true}
+ fn := newCombineFn(less, 3, typex.New(reflectx.String), true)
tests := []struct {
Elms []string
@@ -84,28 +86,30 @@ func TestCombineFnMerge(t *testing.T) {
less := func(a, b string) bool {
return len(a) < len(b)
}
- fn := &combineFn{N: 3, Less: beam.EncodedFunc{Fn: reflectx.MakeFunc(less)}}
-
+ fn := newCombineFn(less, 3, typex.New(reflectx.String), false)
tests := []struct {
Elms [][]string
Expected []string
}{
{[][]string{nil}, nil},
{[][]string{{"foo"}}, []string{"foo"}},
- {[][]string{{"1", "2"}, {"3"}, {"4", "5"}}, []string{"1", "2", "3"}},
+ {[][]string{{"1", "2"}, {"3"}, {"4", "5"}, {"6", "7"}}, []string{"1", "2", "3"}},
{[][]string{{"a1"}, {"b22", "c22"}, {"d333"}, {"e22"}}, []string{"d333", "b22", "c22"}},
+ {[][]string{{"a55555"}, {"b22", "c4444"}, {"d333"}, {"e22"}}, []string{"a55555", "c4444", "d333"}},
}
- for _, test := range tests {
- var list []accum
- for _, a := range test.Elms {
- list = append(list, load(fn, a...))
- }
-
- actual := output(fn, fn.MergeAccumulators(list))
- if !reflect.DeepEqual(actual, test.Expected) {
- t.Errorf("CombineFn(3; %v) = %v, want %v", test.Elms, actual, test.Expected)
- }
+ for i, test := range tests {
+ t.Run(fmt.Sprintf("%02d", i), func(t *testing.T) {
+ var list []accum
+ for _, a := range test.Elms {
+ list = append(list, load(fn, a...))
+ }
+ a := merge(t, fn, list...)
+ actual := output(fn, a)
+ if !reflect.DeepEqual(actual, test.Expected) {
+ t.Errorf("CombineFn(3; %v) = %v, want %v", test.Elms, actual, test.Expected)
+ }
+ })
}
}
@@ -117,6 +121,23 @@ func load(fn *combineFn, elms ...string) accum {
return a
}
+func merge(t *testing.T, fn *combineFn, as ...accum) accum {
+ t.Helper()
+ a := fn.CreateAccumulator()
+ for i, b := range as {
+ buf, err := b.MarshalJSON()
+ if err != nil {
+ t.Fatalf("failure marshalling accum[%d]: %v, %+v", i, err, b)
+ }
+ var c accum
+ if err := c.UnmarshalJSON(buf); err != nil {
+ t.Fatalf("failure unmarshalling accum[%d]: %v, %+v", i, err, b)
+ }
+ a = fn.MergeAccumulators(a, c)
+ }
+ return a
+}
+
func output(fn *combineFn, a accum) []string {
var ret []string
for _, actual := range fn.ExtractOutput(a) {