You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/10/08 16:02:27 UTC
[beam] branch master updated: [BEAM-8017] Plumb errors and remove
panics from package graphx (#13028)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e58d4e1 [BEAM-8017] Plumb errors and remove panics from package graphx (#13028)
e58d4e1 is described below
commit e58d4e1369874a488094d4473fd090d77ec7a9ae
Author: Jing <mi...@gmail.com>
AuthorDate: Thu Oct 8 09:01:49 2020 -0700
[BEAM-8017] Plumb errors and remove panics from package graphx (#13028)
[BEAM-8017] Plumb errors and remove panics from package graphx
Exported methods and functions in package graphx are not backwards compatible, APIs return an extra error indicating roots of failures
---
sdks/go/pkg/beam/core/runtime/graphx/coder.go | 99 ++++---
sdks/go/pkg/beam/core/runtime/graphx/coder_test.go | 6 +-
sdks/go/pkg/beam/core/runtime/graphx/cogbk.go | 31 +-
sdks/go/pkg/beam/core/runtime/graphx/dataflow.go | 6 +-
sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 47 +--
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 327 +++++++++++++++------
sdks/go/pkg/beam/core/runtime/graphx/xlang.go | 12 +-
sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go | 25 +-
sdks/go/pkg/beam/core/runtime/xlangx/resolve.go | 6 +-
sdks/go/pkg/beam/core/runtime/xlangx/translate.go | 36 ++-
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 7 +-
sdks/go/pkg/beam/runners/universal/universal.go | 7 +-
12 files changed, 431 insertions(+), 178 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index c8be1d9..2c4bd43 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -76,10 +76,13 @@ func knownStandardCoders() []string {
}
// MarshalCoders marshals a list of coders into model coders.
-func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder) {
+func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder, error) {
b := NewCoderMarshaller()
- ids := b.AddMulti(coders)
- return ids, b.Build()
+ if ids, err := b.AddMulti(coders); err != nil {
+ return nil, nil, err
+ } else {
+ return ids, b.Build(), nil
+ }
}
// UnmarshalCoders unmarshals coders.
@@ -160,19 +163,23 @@ func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error) {
return nil, err
}
- w := urnToWindowCoder(c.GetSpec().GetUrn())
+ w, err := urnToWindowCoder(c.GetSpec().GetUrn())
+ if err != nil {
+ return nil, errors.SetTopLevelMsgf(err, "failed to unmarshal window coder %v", id)
+ }
b.windowCoders[id] = w
return w, nil
}
-func urnToWindowCoder(urn string) *coder.WindowCoder {
+func urnToWindowCoder(urn string) (*coder.WindowCoder, error) {
switch urn {
case urnGlobalWindow:
- return coder.NewGlobalWindow()
+ return coder.NewGlobalWindow(), nil
case urnIntervalWindow:
- return coder.NewIntervalWindow()
+ return coder.NewIntervalWindow(), nil
default:
- panic(fmt.Sprintf("Failed to translate URN to window coder, unexpected URN: %v", urn))
+ err := errors.Errorf("unexpected URN %v for window coder", urn)
+ return nil, errors.WithContext(err, "translate URN to window coder")
}
}
@@ -422,19 +429,19 @@ func NewCoderMarshaller() *CoderMarshaller {
}
// Add adds the given coder to the set and returns its id. Idempotent.
-func (b *CoderMarshaller) Add(c *coder.Coder) string {
+func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
switch c.Kind {
case coder.Custom:
ref, err := encodeCustomCoder(c.Custom)
if err != nil {
typeName := c.Custom.Name
- panic(errors.SetTopLevelMsgf(err, "Failed to encode custom coder for type %s. "+
+ return "", errors.SetTopLevelMsgf(err, "failed to encode custom coder for type %s. "+
"Make sure the type was registered before calling beam.Init. For example: "+
- "beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName))
+ "beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName)
}
data, err := protox.EncodeBase64(ref)
if err != nil {
- panic(errors.Wrapf(err, "Failed to marshal custom coder %v", c))
+ return "", errors.Wrapf(err, "failed to marshal custom coder %v", c)
}
inner := b.internCoder(&pipepb.Coder{
Spec: &pipepb.FunctionSpec{
@@ -442,15 +449,20 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
Payload: []byte(data),
},
})
- return b.internBuiltInCoder(urnLengthPrefixCoder, inner)
+ return b.internBuiltInCoder(urnLengthPrefixCoder, inner), nil
case coder.KV:
- comp := b.AddMulti(c.Components)
- return b.internBuiltInCoder(urnKVCoder, comp...)
+ comp, err := b.AddMulti(c.Components)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to marshal KV coder %v", c)
+ }
+ return b.internBuiltInCoder(urnKVCoder, comp...), nil
case coder.CoGBK:
- comp := b.AddMulti(c.Components)
-
+ comp, err := b.AddMulti(c.Components)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to marshal CoGBK coder %v", c)
+ }
value := comp[1]
if len(comp) > 2 {
// TODO(BEAM-490): don't inject union coder for CoGBK.
@@ -461,62 +473,77 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
// SDKs always provide iterableCoder to runners, but can receive StateBackedIterables in return.
stream := b.internBuiltInCoder(urnIterableCoder, value)
- return b.internBuiltInCoder(urnKVCoder, comp[0], stream)
+ return b.internBuiltInCoder(urnKVCoder, comp[0], stream), nil
case coder.WindowedValue:
- comp := b.AddMulti(c.Components)
- comp = append(comp, b.AddWindowCoder(c.Window))
- return b.internBuiltInCoder(urnWindowedValueCoder, comp...)
+ comp := []string{}
+ if ids, err := b.AddMulti(c.Components); err != nil {
+ return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+ } else {
+ comp = append(comp, ids...)
+ }
+ if id, err := b.AddWindowCoder(c.Window); err != nil {
+ return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+ } else {
+ comp = append(comp, id)
+ }
+ return b.internBuiltInCoder(urnWindowedValueCoder, comp...), nil
case coder.Bytes:
// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
- return b.internBuiltInCoder(urnBytesCoder)
+ return b.internBuiltInCoder(urnBytesCoder), nil
case coder.Bool:
- return b.internBuiltInCoder(urnBoolCoder)
+ return b.internBuiltInCoder(urnBoolCoder), nil
case coder.VarInt:
- return b.internBuiltInCoder(urnVarIntCoder)
+ return b.internBuiltInCoder(urnVarIntCoder), nil
case coder.Double:
- return b.internBuiltInCoder(urnDoubleCoder)
+ return b.internBuiltInCoder(urnDoubleCoder), nil
case coder.String:
- return b.internBuiltInCoder(urnStringCoder)
+ return b.internBuiltInCoder(urnStringCoder), nil
case coder.Row:
rt := c.T.Type()
s, err := schema.FromType(rt)
if err != nil {
- panic(errors.SetTopLevelMsgf(err, "Failed to convert type %v to a schema.", rt))
+ return "", errors.SetTopLevelMsgf(err, "failed to convert type %v to a schema.", rt)
}
- return b.internRowCoder(s)
+ return b.internRowCoder(s), nil
// TODO(BEAM-10660): Handle coder.Timer support.
default:
- panic(fmt.Sprintf("Failed to marshal coder %v, unexpected coder kind: %v", c, c.Kind))
+ err := errors.Errorf("unexpected coder kind: %v", c.Kind)
+ return "", errors.WithContextf(err, "failed to marshal coder %v", c)
}
}
// AddMulti adds the given coders to the set and returns their ids. Idempotent.
-func (b *CoderMarshaller) AddMulti(list []*coder.Coder) []string {
+func (b *CoderMarshaller) AddMulti(list []*coder.Coder) ([]string, error) {
var ids []string
for _, c := range list {
- ids = append(ids, b.Add(c))
+ if id, err := b.Add(c); err != nil {
+ return nil, errors.Wrapf(err, "failed to marshal the coder %v.", c)
+ } else {
+ ids = append(ids, id)
+ }
}
- return ids
+ return ids, nil
}
// AddWindowCoder adds a window coder.
-func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string {
+func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) (string, error) {
switch w.Kind {
case coder.GlobalWindow:
- return b.internBuiltInCoder(urnGlobalWindow)
+ return b.internBuiltInCoder(urnGlobalWindow), nil
case coder.IntervalWindow:
- return b.internBuiltInCoder(urnIntervalWindow)
+ return b.internBuiltInCoder(urnIntervalWindow), nil
default:
- panic(fmt.Sprintf("Failed to add window coder %v, unexpected window kind: %v", w, w.Kind))
+ err := errors.Errorf("window coder with unexpected type %v", w.Kind)
+ return "", errors.WithContextf(err, "failed to unmarshal window coder %v", w)
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index cb9dcff..c781e30 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -107,7 +107,11 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- coders, err := graphx.UnmarshalCoders(graphx.MarshalCoders([]*coder.Coder{test.c}))
+ ids, marshalCoders, err := graphx.MarshalCoders([]*coder.Coder{test.c})
+ if err != nil {
+ t.Fatalf("Marshal(%v) failed: %v", test.c, err)
+ }
+ coders, err := graphx.UnmarshalCoders(ids, marshalCoders)
if err != nil {
t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", test.c, err)
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
index 0ec3ba8..ed5587e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
@@ -16,13 +16,12 @@
package graphx
import (
- "fmt"
-
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
// CoGBK support
@@ -70,37 +69,47 @@ const (
)
// MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
if gbk.Op != graph.CoGBK {
- panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+ err := errors.Errorf("expected CoGBK, got %v", gbk)
+ return nil, errors.WithContext(err, "failed to make KV Union coder")
}
from := gbk.Input[0].From
key := from.Coder.Components[0]
- return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
+ kvCoder, err := makeUnionCoder()
+ if err != nil {
+ return nil, errors.Wrapf(err, "failed to make KV Union coder.")
+ }
+ return coder.NewKV([]*coder.Coder{key, kvCoder}), nil
}
// MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
-func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
if gbk.Op != graph.CoGBK {
- panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+ err := errors.Errorf("expected CoGBK, got %v", gbk)
+ return nil, errors.WithContext(err, "failed to make GBK Union coder")
}
from := gbk.Input[0].From
key := from.Coder.Components[0]
- return coder.NewCoGBK([]*coder.Coder{key, makeUnionCoder()})
+ kvCoder, err := makeUnionCoder()
+ if err != nil {
+ return nil, errors.Wrapf(err, "failed to make GBK Union coder.")
+ }
+ return coder.NewCoGBK([]*coder.Coder{key, kvCoder}), nil
}
// makeUnionCoder returns a coder for the raw union value, KV<int,[]byte>. It uses
// varintz instead of the built-in varint to avoid the implicit length-prefixing
// of varint otherwise introduced by Dataflow.
-func makeUnionCoder() *coder.Coder {
+func makeUnionCoder() (*coder.Coder, error) {
c, err := coderx.NewVarIntZ(reflectx.Int)
if err != nil {
- panic(err)
+ return nil, err
}
return coder.NewKV([]*coder.Coder{
{Kind: coder.Custom, T: typex.New(reflectx.Int), Custom: c},
coder.NewBytes(),
- })
+ }), nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
index 30079e8..3f68e63 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -64,12 +64,12 @@ func WrapIterable(c *CoderRef) *CoderRef {
}
// WrapWindowed adds a windowed coder for Dataflow collections.
-func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) *CoderRef {
+func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) (*CoderRef, error) {
w, err := encodeWindowCoder(wc)
if err != nil {
- panic(err)
+ return nil, err
}
- return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, w}, IsWrapper: true}
+ return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, w}, IsWrapper: true}, nil
}
// EncodeCoderRefs returns the encoded forms understood by the runner.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 251f222..9a9094f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -16,7 +16,6 @@
package graphx
import (
- "fmt"
"reflect"
"strings"
@@ -63,7 +62,11 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1pb.MultiEdge, error) {
}
for _, in := range edge.Input {
- kind := encodeInputKind(in.Kind)
+ kind, err := encodeInputKind(in.Kind)
+ if err != nil {
+ wrapped := errors.Wrap(err, "bad input type")
+ return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
+ }
t, err := encodeFullType(in.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad input type")
@@ -241,7 +244,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
typ, err := encodeType(t)
if err != nil {
wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv)
- panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u))
+ return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u)
}
data, err := jsonx.Marshal(u.Recv)
@@ -252,7 +255,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
return &v1pb.Fn{Type: typ, Opt: string(data)}, nil
default:
- panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
+ return nil, errors.Errorf("failed to encode DoFn %v, missing fn", u)
}
}
@@ -479,7 +482,11 @@ func encodeType(t reflect.Type) (*v1pb.Type, error) {
wrapped := errors.Wrap(err, "bad element type")
return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
}
- dir := encodeChanDir(t.ChanDir())
+ dir, err := encodeChanDir(t.ChanDir())
+ if err != nil {
+ wrapped := errors.Wrap(err, "bad channel direction")
+ return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
+ }
return &v1pb.Type{Kind: v1pb.Type_CHAN, Element: elm, ChanDir: dir}, nil
case reflect.Ptr:
@@ -725,16 +732,17 @@ func decodeInts(offsets []int32) []int {
return ret
}
-func encodeChanDir(dir reflect.ChanDir) v1pb.Type_ChanDir {
+func encodeChanDir(dir reflect.ChanDir) (v1pb.Type_ChanDir, error) {
switch dir {
case reflect.RecvDir:
- return v1pb.Type_RECV
+ return v1pb.Type_RECV, nil
case reflect.SendDir:
- return v1pb.Type_SEND
+ return v1pb.Type_SEND, nil
case reflect.BothDir:
- return v1pb.Type_BOTH
+ return v1pb.Type_BOTH, nil
default:
- panic(fmt.Sprintf("Failed to encode channel direction, invalid value: %v", dir))
+ err := errors.Errorf("invalid value: %v", dir)
+ return v1pb.Type_BOTH, errors.WithContextf(err, "encoding channel direction")
}
}
@@ -752,24 +760,25 @@ func decodeChanDir(dir v1pb.Type_ChanDir) (reflect.ChanDir, error) {
}
}
-func encodeInputKind(k graph.InputKind) v1pb.MultiEdge_Inbound_InputKind {
+func encodeInputKind(k graph.InputKind) (v1pb.MultiEdge_Inbound_InputKind, error) {
switch k {
case graph.Main:
- return v1pb.MultiEdge_Inbound_MAIN
+ return v1pb.MultiEdge_Inbound_MAIN, nil
case graph.Singleton:
- return v1pb.MultiEdge_Inbound_SINGLETON
+ return v1pb.MultiEdge_Inbound_SINGLETON, nil
case graph.Slice:
- return v1pb.MultiEdge_Inbound_SLICE
+ return v1pb.MultiEdge_Inbound_SLICE, nil
case graph.Map:
- return v1pb.MultiEdge_Inbound_MAP
+ return v1pb.MultiEdge_Inbound_MAP, nil
case graph.MultiMap:
- return v1pb.MultiEdge_Inbound_MULTIMAP
+ return v1pb.MultiEdge_Inbound_MULTIMAP, nil
case graph.Iter:
- return v1pb.MultiEdge_Inbound_ITER
+ return v1pb.MultiEdge_Inbound_ITER, nil
case graph.ReIter:
- return v1pb.MultiEdge_Inbound_REITER
+ return v1pb.MultiEdge_Inbound_REITER, nil
default:
- panic(fmt.Sprintf("Failed to encode input kind, invalid value: %v", k))
+ err := errors.Errorf("invalid value: %v", k)
+ return v1pb.MultiEdge_Inbound_MAIN, errors.WithContextf(err, "encoding input kind")
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d427e9f..70d39a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -77,12 +77,12 @@ func goCapabilities() []string {
}
// CreateEnvironment produces the appropriate payload for the type of environment.
-func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
+func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) {
var serializedPayload []byte
switch urn {
case "beam:env:process:v1":
// TODO Support process based SDK Harness.
- panic(fmt.Sprintf("Unsupported environment %v", urn))
+ return nil, errors.Errorf("unsupported environment %v", urn)
case "beam:env:external:v1":
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
@@ -107,7 +107,7 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
}),
},
},
- }
+ }, nil
}
// TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction.
@@ -124,10 +124,16 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) {
m := newMarshaller(opt)
for _, edge := range tree.Edges {
- m.addMultiEdge(edge)
+ _, err := m.addMultiEdge(edge)
+ if err != nil {
+ return nil, err
+ }
}
for _, t := range tree.Children {
- m.addScopeTree(t)
+ _, err := m.addScopeTree(t)
+ if err != nil {
+ return nil, err
+ }
}
p := &pipepb.Pipeline{
@@ -184,18 +190,26 @@ func (m *marshaller) getRequirements() []string {
return reqs
}
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
id := scopeID(s.Scope.Scope)
if _, exists := m.transforms[id]; exists {
- return id
+ return id, nil
}
var subtransforms []string
for _, edge := range s.Edges {
- subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
+ ids, err := m.addMultiEdge(edge)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+ }
+ subtransforms = append(subtransforms, ids...)
}
for _, tree := range s.Children {
- subtransforms = append(subtransforms, m.addScopeTree(tree))
+ id, err := m.addScopeTree(tree)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+ }
+ subtransforms = append(subtransforms, id)
}
transform := &pipepb.PTransform{
@@ -204,60 +218,89 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
EnvironmentId: m.addDefaultEnv(),
}
- m.updateIfCombineComposite(s, transform)
+ if err := m.updateIfCombineComposite(s, transform); err != nil {
+ return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+ }
m.transforms[id] = transform
- return id
+ return id, nil
}
// updateIfCombineComposite examines the scope tree and sets the PTransform Spec
// to be a CombinePerKey with a CombinePayload if it's a liftable composite.
// Beam Portability requires that composites contain an implementation for runners
// that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) error {
if s.Scope.Name != graph.CombinePerKeyScope ||
len(s.Edges) != 2 ||
len(s.Edges[0].Edge.Input) != 1 ||
len(s.Edges[1].Edge.Output) != 1 ||
s.Edges[1].Edge.Op != graph.Combine {
- return
+ return nil
}
edge := s.Edges[1].Edge
- acID := m.coders.Add(edge.AccumCoder)
+ acID, err := m.coders.Add(edge.AccumCoder)
+ if err != nil {
+ return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
+ }
+ mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge)
+ if err != nil {
+ return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
+ }
payload := &pipepb.CombinePayload{
CombineFn: &pipepb.FunctionSpec{
Urn: URNDoFn,
- Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
+ Payload: []byte(mustEncodeMultiEdge),
},
AccumulatorCoderId: acID,
}
transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
+ return nil
}
-func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
+ handleErr := func(err error) ([]string, error) {
+ return nil, errors.Wrapf(err, "failed to add input kind: %v", edge)
+ }
id := edgeID(edge.Edge)
if _, exists := m.transforms[id]; exists {
- return []string{id}
+ return []string{id}, nil
}
switch {
case edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1:
- return []string{m.expandCoGBK(edge)}
+ cogbkID, err := m.expandCoGBK(edge)
+ if err != nil {
+ return handleErr(err)
+ }
+ return []string{cogbkID}, nil
case edge.Edge.Op == graph.Reshuffle:
- return []string{m.expandReshuffle(edge)}
+ reshuffleID, err := m.expandReshuffle(edge)
+ if err != nil {
+ return handleErr(err)
+ }
+ return []string{reshuffleID}, nil
case edge.Edge.Op == graph.External && edge.Edge.Payload == nil:
- return []string{m.expandCrossLanguage(edge)}
+ edgeID, err := m.expandCrossLanguage(edge)
+ if err != nil {
+ return handleErr(err)
+ }
+ return []string{edgeID}, nil
}
inputs := make(map[string]string)
for i, in := range edge.Edge.Input {
- m.addNode(in.From)
+ if _, err := m.addNode(in.From); err != nil {
+ return handleErr(err)
+ }
inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
}
outputs := make(map[string]string)
for i, out := range edge.Edge.Output {
- m.addNode(out.To)
+ if _, err := m.addNode(out.To); err != nil {
+ return handleErr(err)
+ }
outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
}
@@ -281,7 +324,13 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
// "", even if the input is already KV.
out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i)
- m.makeNode(out, m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
+ coderId, err := m.coders.Add(makeBytesKeyedCoder(in.From.Coder))
+ if err != nil {
+ return handleErr(err)
+ }
+ if _, err := m.makeNode(out, coderId, in.From); err != nil {
+ return handleErr(err)
+ }
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
@@ -322,31 +371,44 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
}
case graph.Map, graph.MultiMap:
- panic("NYI")
+ return nil, errors.Errorf("not implemented")
default:
- panic(fmt.Sprintf("unexpected input kind: %v", edge))
+ return nil, errors.Errorf("unexpected input kind: %v", edge)
}
}
+ mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+ if err != nil {
+ return handleErr(err)
+ }
+
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNDoFn,
- Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+ Payload: []byte(mustEncodeMultiEdge),
},
SideInputs: si,
}
if edge.Edge.DoFn.IsSplittable() {
- payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
+ coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
+ if err != nil {
+ return handleErr(err)
+ }
+ payload.RestrictionCoderId = coderId
m.requirements[URNRequiresSplittableDoFn] = true
}
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
case graph.Combine:
+ mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+ if err != nil {
+ return handleErr(err)
+ }
payload := &pipepb.ParDoPayload{
DoFn: &pipepb.FunctionSpec{
Urn: URNDoFn,
- Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+ Payload: []byte(mustEncodeMultiEdge),
},
}
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
@@ -358,8 +420,12 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
spec = &pipepb.FunctionSpec{Urn: URNGBK}
case graph.WindowInto:
+ windowFn, err := makeWindowFn(edge.Edge.WindowFn)
+ if err != nil {
+ return handleErr(err)
+ }
payload := &pipepb.WindowIntoPayload{
- WindowFn: makeWindowFn(edge.Edge.WindowFn),
+ WindowFn: windowFn,
}
spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}
@@ -367,7 +433,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: edge.Edge.Payload.Data}
default:
- panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
+ err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
+ return handleErr(err)
}
var transformEnvID = ""
@@ -384,17 +451,19 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
}
m.transforms[id] = transform
allPIds = append(allPIds, id)
- return allPIds
+ return allPIds, nil
}
-func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
+func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) (string, error) {
edge := namedEdge.Edge
id := edgeID(edge)
inputs := make(map[string]string)
for tag, n := range ExternalInputs(edge) {
- m.addNode(n)
+ if _, err := m.addNode(n); err != nil {
+ return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+ }
// Ignore tag if it is a dummy SourceInputTag
if tag == graph.SourceInputTag {
tag = fmt.Sprintf("i%v", edge.External.InputsMap[tag])
@@ -420,33 +489,61 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
// map consumers of these outputs to the expanded transform's outputs.
outputs := make(map[string]string)
for i, out := range edge.Output {
- m.addNode(out.To)
+ if _, err := m.addNode(out.To); err != nil {
+ return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+ }
outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
}
transform.Outputs = outputs
- transform.EnvironmentId = ExpandedTransform(edge.External.Expanded).EnvironmentId
+ environment, err := ExpandedTransform(edge.External.Expanded)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+ }
+ transform.EnvironmentId = environment.EnvironmentId
}
m.transforms[id] = transform
- return id
+ return id, nil
}
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have to translate
// CoGBK with multiple PCollections as described in cogbk.go.
+ handleErr := func(err error) (string, error) {
+ return "", errors.Wrapf(err, "failed to expand CoGBK transform for edge: %v", edge)
+ }
id := edgeID(edge.Edge)
- kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
- gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+ kvCoder, err := MakeKVUnionCoder(edge.Edge)
+ if err != nil {
+ return handleErr(err)
+ }
+ kvCoderID, err := m.coders.Add(kvCoder)
+ if err != nil {
+ return handleErr(err)
+ }
+ gbkCoder, err := MakeGBKUnionCoder(edge.Edge)
+ if err != nil {
+ return handleErr(err)
+ }
+ gbkCoderID, err := m.coders.Add(gbkCoder)
+ if err != nil {
+ return handleErr(err)
+ }
var subtransforms []string
inputs := make(map[string]string)
for i, in := range edge.Edge.Input {
- m.addNode(in.From)
+
+ if _, err := m.addNode(in.From); err != nil {
+ return handleErr(err)
+ }
out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
- m.makeNode(out, kvCoderID, in.From)
+ if _, err := m.makeNode(out, kvCoderID, in.From); err != nil {
+ return handleErr(err)
+ }
// Inject(i)
@@ -481,7 +578,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// Flatten
out := fmt.Sprintf("%v_flatten", nodeID(outNode))
- m.makeNode(out, kvCoderID, outNode)
+ if _, err := m.makeNode(out, kvCoderID, outNode); err != nil {
+ return handleErr(err)
+ }
flattenID := fmt.Sprintf("%v_flatten", id)
flatten := &pipepb.PTransform{
@@ -497,7 +596,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// CoGBK
gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
- m.makeNode(gbkOut, gbkCoderID, outNode)
+ if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+ return handleErr(err)
+ }
gbkID := fmt.Sprintf("%v_gbk", id)
gbk := &pipepb.PTransform{
@@ -511,7 +612,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// Expand
- m.addNode(outNode)
+ if _, err := m.addNode(outNode); err != nil {
+ return handleErr(err)
+ }
expandID := fmt.Sprintf("%v_expand", id)
payload := &pipepb.ParDoPayload{
@@ -543,7 +646,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
Subtransforms: subtransforms,
EnvironmentId: m.addDefaultEnv(),
}
- return cogbkID
+ return cogbkID, nil
}
// expandReshuffle translates resharding to a composite reshuffle
@@ -569,36 +672,62 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
// User code is able to write reshards, but it's easier to access
// the window coders framework side, which is critical for the reshard
// to function with unbounded inputs.
-func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
+ handleErr := func(err error) (string, error) {
+ return "", errors.Wrapf(err, "failed to expand Reshuffle transform for edge: %v", edge)
+ }
id := edgeID(edge.Edge)
- var kvCoderID, gbkCoderID string
- {
- kv := makeUnionCoder()
- kvCoderID = m.coders.Add(kv)
- gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+ kvCoder, err := makeUnionCoder()
+ if err != nil {
+ return handleErr(err)
+ }
+ kvCoderID, err := m.coders.Add(kvCoder)
+ if err != nil {
+ return handleErr(err)
+ }
+ gbkCoderID, err := m.coders.Add(coder.NewCoGBK(kvCoder.Components))
+ if err != nil {
+ return handleErr(err)
}
var subtransforms []string
in := edge.Edge.Input[0]
- origInput := m.addNode(in.From)
+ origInput, err := m.addNode(in.From)
+ if err != nil {
+ return handleErr(err)
+ }
// We need to preserve the old windowing/triggering here
// for re-instatement after the GBK.
preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
// Get the windowing strategy from before:
postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
- m.makeNode(postReify, kvCoderID, in.From)
+ if _, err := m.makeNode(postReify, kvCoderID, in.From); err != nil {
+ return handleErr(err)
+ }
// We need to replace postReify's windowing strategy with one appropriate
// for reshuffles.
{
wfn := window.NewGlobalWindows()
+ windowFn, err := makeWindowFn(wfn)
+ if err != nil {
+ return handleErr(err)
+ }
+ coderId, err := makeWindowCoder(wfn)
+ if err != nil {
+ return handleErr(err)
+ }
+ windowCoderId, err := m.coders.AddWindowCoder(coderId)
+ if err != nil {
+ return handleErr(err)
+ }
m.pcollections[postReify].WindowingStrategyId =
m.internWindowingStrategy(&pipepb.WindowingStrategy{
// Not segregated by time...
- WindowFn: makeWindowFn(wfn),
+ WindowFn: windowFn,
// ...output after every element is received...
Trigger: &pipepb.Trigger{
Trigger: &pipepb.Trigger_Always_{
@@ -614,7 +743,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
// TODO(BEAM-3304): migrate to user side operations once trigger support is in.
EnvironmentId: m.addDefaultEnv(),
MergeStatus: pipepb.MergeStatus_NON_MERGING,
- WindowCoderId: m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+ WindowCoderId: windowCoderId,
ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
AllowedLateness: 0,
OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
@@ -650,7 +779,9 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
// GBK
gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
- m.makeNode(gbkOut, gbkCoderID, outNode)
+ if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+ return handleErr(err)
+ }
gbkID := fmt.Sprintf("%v_gbk", id)
gbk := &pipepb.PTransform{
@@ -664,7 +795,10 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
// Expand
- outPCol := m.addNode(outNode)
+ outPCol, err := m.addNode(outNode)
+ if err != nil {
+ return handleErr(err)
+ }
m.pcollections[outPCol].WindowingStrategyId = preservedWSId
outputID := fmt.Sprintf("%v_unreify", id)
@@ -699,27 +833,37 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
},
EnvironmentId: m.addDefaultEnv(),
}
- return reshuffleID
+ return reshuffleID, nil
}
-func (m *marshaller) addNode(n *graph.Node) string {
+func (m *marshaller) addNode(n *graph.Node) (string, error) {
id := nodeID(n)
if _, exists := m.pcollections[id]; exists {
- return id
+ return id, nil
}
// TODO(herohde) 11/15/2017: expose UniqueName to user.
- return m.makeNode(id, m.coders.Add(n.Coder), n)
+ cid, err := m.coders.Add(n.Coder)
+ if err != nil {
+ return "", err
+ }
+ return m.makeNode(id, cid, n)
}
-func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
+func (m *marshaller) makeNode(id, cid string, n *graph.Node) (string, error) {
+ windowingStrategyId, err := m.addWindowingStrategy(n.WindowingStrategy())
+
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to make node %v with node id %v", n, id)
+ }
+
col := &pipepb.PCollection{
UniqueName: id,
CoderId: cid,
IsBounded: boolToBounded(n.Bounded()),
- WindowingStrategyId: m.addWindowingStrategy(n.WindowingStrategy()),
+ WindowingStrategyId: windowingStrategyId,
}
m.pcollections[id] = col
- return id
+ return id, nil
}
func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
@@ -737,10 +881,13 @@ func (m *marshaller) addDefaultEnv() string {
return id
}
-func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
- ws := marshalWindowingStrategy(m.coders, w)
+func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) (string, error) {
+ ws, err := marshalWindowingStrategy(m.coders, w)
+ if err != nil {
+ return "", errors.Wrapf(err, "failed to add window strategy %v", w)
+ }
ws.EnvironmentId = m.addDefaultEnv()
- return m.internWindowingStrategy(ws)
+ return m.internWindowingStrategy(ws), nil
}
func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) string {
@@ -757,12 +904,24 @@ func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) string
// marshalWindowingStrategy marshals the given windowing strategy in
// the given coder context.
-func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *pipepb.WindowingStrategy {
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (*pipepb.WindowingStrategy, error) {
+ windowFn, err := makeWindowFn(w.Fn)
+ if err != nil {
+ return nil, err
+ }
+ coderId, err := makeWindowCoder(w.Fn)
+ if err != nil {
+ return nil, err
+ }
+ windowCoderId, err := c.AddWindowCoder(coderId)
+ if err != nil {
+ return nil, err
+ }
ws := &pipepb.WindowingStrategy{
- WindowFn: makeWindowFn(w.Fn),
+ WindowFn: windowFn,
MergeStatus: pipepb.MergeStatus_NON_MERGING,
AccumulationMode: pipepb.AccumulationMode_DISCARDING,
- WindowCoderId: c.AddWindowCoder(makeWindowCoder(w.Fn)),
+ WindowCoderId: windowCoderId,
Trigger: &pipepb.Trigger{
Trigger: &pipepb.Trigger_Default_{
Default: &pipepb.Trigger_Default{},
@@ -773,15 +932,15 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *
AllowedLateness: 0,
OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
}
- return ws
+ return ws, nil
}
-func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
+func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
switch w.Kind {
case window.GlobalWindows:
return &pipepb.FunctionSpec{
Urn: URNGlobalWindowsWindowFn,
- }
+ }, nil
case window.FixedWindows:
return &pipepb.FunctionSpec{
Urn: URNFixedWindowsWindowFn,
@@ -790,7 +949,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
Size: ptypes.DurationProto(w.Size),
},
),
- }
+ }, nil
case window.SlidingWindows:
return &pipepb.FunctionSpec{
Urn: URNSlidingWindowsWindowFn,
@@ -800,7 +959,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
Period: ptypes.DurationProto(w.Period),
},
),
- }
+ }, nil
case window.Sessions:
return &pipepb.FunctionSpec{
Urn: URNSessionsWindowFn,
@@ -809,32 +968,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
GapSize: ptypes.DurationProto(w.Gap),
},
),
- }
+ }, nil
default:
- panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+ return nil, errors.Errorf("unexpected windowing strategy: %v", w)
}
}
-func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
+func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) {
switch w.Kind {
case window.GlobalWindows:
- return coder.NewGlobalWindow()
+ return coder.NewGlobalWindow(), nil
case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn:
- return coder.NewIntervalWindow()
+ return coder.NewIntervalWindow(), nil
default:
- panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+ return nil, errors.Errorf("unexpected windowing strategy: %v", w)
}
}
-func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
+func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) (string, error) {
ref, err := EncodeMultiEdge(edge)
if err != nil {
- panic(errors.Wrapf(err, "Failed to serialize %v", edge))
+ return "", errors.Wrapf(err, "failed to serialize %v", edge)
}
return protox.MustEncodeBase64(&v1pb.TransformPayload{
Urn: URNDoFn,
Edge: ref,
- })
+ }), nil
}
// makeBytesKeyedCoder returns KV<[]byte,A,> for any coder,
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
index cc1bb71..3311e46 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
@@ -23,20 +23,20 @@ import (
// ExpandedComponents type asserts the Components field with interface{} type
// and returns its pipeline component proto representation
-func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {
+func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error) {
if c, ok := exp.Components.(*pipepb.Components); ok {
- return c
+ return c, nil
}
- panic(errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp))
+ return nil, errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp)
}
// ExpandedTransform type asserts the Transform field with interface{} type
// and returns its pipeline ptransform proto representation
-func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform {
+func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, error) {
if t, ok := exp.Transform.(*pipepb.PTransform); ok {
- return t
+ return t, nil
}
- panic(errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp))
+ return nil, errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp)
}
// ExternalInputs returns the map (tag -> graph node representing the
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
index 98e415b..a71916e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
@@ -156,7 +156,10 @@ func TestExpandedTransform(t *testing.T) {
want := newTransform("x")
exp := &graph.ExpandedTransform{Transform: want}
- got := ExpandedTransform(exp)
+ got, err := ExpandedTransform(exp)
+ if err != nil {
+ t.Fatal(err)
+ }
if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
t.Errorf("diff (-want, +got): %v", d)
@@ -165,9 +168,12 @@ func TestExpandedTransform(t *testing.T) {
})
t.Run("Malformed PTransform", func(t *testing.T) {
- defer expectPanic(t, "string can't be type asserted into a pipeline PTransform")
exp := &graph.ExpandedTransform{Transform: "gibberish"}
- ExpandedTransform(exp)
+ expectedError := fmt.Sprintf("malformed transform; %v lacks a conforming pipeline ptransform", exp)
+ if _, actualError := ExpandedTransform(exp); actualError.Error() != expectedError {
+ t.Errorf("got error %v, want error %v", actualError, expectedError)
+ }
+
})
}
@@ -176,7 +182,11 @@ func TestExpandedComponents(t *testing.T) {
want := newComponents([]string{"x"})
exp := &graph.ExpandedTransform{Components: want}
- got := ExpandedComponents(exp)
+ got, err := ExpandedComponents(exp)
+
+ if err != nil {
+ t.Fatal(err)
+ }
if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
t.Errorf("diff (-want, +got): %v", d)
@@ -185,8 +195,11 @@ func TestExpandedComponents(t *testing.T) {
})
t.Run("Malformed Components", func(t *testing.T) {
- defer expectPanic(t, "string can't be type asserted into a pipeline Components")
exp := &graph.ExpandedTransform{Transform: "gibberish"}
- ExpandedComponents(exp)
+ expectedError := fmt.Sprintf("malformed components; %v lacks a conforming pipeline component", exp)
+ if _, actualError := ExpandedComponents(exp); actualError.Error() != expectedError {
+ t.Errorf("got error %v, want error %v", actualError, expectedError)
+ }
+
})
}
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index ec3c552..87dba0c 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -35,7 +35,11 @@ func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.P
}
for _, e := range edges {
if e.Op == graph.External {
- envs := graphx.ExpandedComponents(e.External.Expanded).Environments
+ components, err := graphx.ExpandedComponents(e.External.Expanded)
+ if err != nil {
+ panic(err)
+ }
+ envs := components.Environments
for eid, env := range envs {
if strings.HasPrefix(eid, "go") {
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
index 5a456dc..fb5abef 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
@@ -36,7 +36,10 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
p.Requirements = append(p.Requirements, exp.Requirements...)
// Adding components of the Expanded Transforms to the current Pipeline
- components := graphx.ExpandedComponents(exp)
+ components, err := graphx.ExpandedComponents(exp)
+ if err != nil {
+ panic(err)
+ }
for k, v := range components.GetTransforms() {
p.Components.Transforms[k] = v
}
@@ -60,8 +63,11 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
p.Components.Environments[k] = v
}
- p.Components.Transforms[id] = graphx.ExpandedTransform(exp)
-
+ transform, err := graphx.ExpandedTransform(exp)
+ if err != nil {
+ panic(err)
+ }
+ p.Components.Transforms[id] = transform
}
}
}
@@ -79,7 +85,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
for tag, n := range graphx.ExternalOutputs(e) {
nodeID := fmt.Sprintf("n%v", n.ID())
- expandedOutputs := graphx.ExpandedTransform(e.External.Expanded).GetOutputs()
+ transform, err := graphx.ExpandedTransform(e.External.Expanded)
+ if err != nil {
+ panic(err)
+ }
+ expandedOutputs := transform.GetOutputs()
var pcolID string
if tag == graph.SinkOutputTag {
for _, pcolID = range expandedOutputs {
@@ -109,7 +119,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
// VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs
func VerifyNamedOutputs(ext *graph.ExternalTransform) {
- expandedOutputs := graphx.ExpandedTransform(ext.Expanded).GetOutputs()
+ transform, err := graphx.ExpandedTransform(ext.Expanded)
+ if err != nil {
+ panic(err)
+ }
+ expandedOutputs := transform.GetOutputs()
if len(expandedOutputs) != len(ext.OutputsMap) {
panic(errors.Errorf("mismatched number of named outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap)))
@@ -131,8 +145,16 @@ func VerifyNamedOutputs(ext *graph.ExternalTransform) {
func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool)) {
ext := e.External
exp := ext.Expanded
- expandedPCollections := graphx.ExpandedComponents(exp).GetPcollections()
- expandedOutputs := graphx.ExpandedTransform(exp).GetOutputs()
+ components, err := graphx.ExpandedComponents(exp)
+ if err != nil {
+ panic(err)
+ }
+ expandedPCollections := components.GetPcollections()
+ transform, err := graphx.ExpandedTransform(exp)
+ if err != nil {
+ panic(err)
+ }
+ expandedOutputs := transform.GetOutputs()
for tag, node := range graphx.ExternalOutputs(e) {
var id string
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index f739da7..d830cc5 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -171,8 +171,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
if err != nil {
return err
}
- model, err := graphx.Marshal(edges, &graphx.Options{Environment: graphx.CreateEnvironment(
- ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)})
+ enviroment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+ if err != nil {
+ return errors.WithContext(err, "generating model pipeline")
+ }
+ model, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment})
if err != nil {
return errors.WithContext(err, "generating model pipeline")
}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index 940f852..fac57b7 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -77,8 +77,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
getEnvCfg = srv.EnvironmentConfig
}
- pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: graphx.CreateEnvironment(
- ctx, envUrn, getEnvCfg)})
+ enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
+ if err != nil {
+ return errors.WithContextf(err, "generating model pipeline")
+ }
+ pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment})
if err != nil {
return errors.WithContextf(err, "generating model pipeline")
}