You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/10/08 16:02:27 UTC

[beam] branch master updated: [BEAM-8017] Plumb errors and remove panics from package graphx (#13028)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e58d4e1  [BEAM-8017] Plumb errors and remove panics from package graphx (#13028)
e58d4e1 is described below

commit e58d4e1369874a488094d4473fd090d77ec7a9ae
Author: Jing <mi...@gmail.com>
AuthorDate: Thu Oct 8 09:01:49 2020 -0700

    [BEAM-8017] Plumb errors and remove panics from package graphx (#13028)
    
    [BEAM-8017] Plumb errors and remove panics from package graphx
    Exported methods and functions in package graphx are not backwards compatible, APIs return an extra error indicating roots of failures
---
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  99 ++++---
 sdks/go/pkg/beam/core/runtime/graphx/coder_test.go |   6 +-
 sdks/go/pkg/beam/core/runtime/graphx/cogbk.go      |  31 +-
 sdks/go/pkg/beam/core/runtime/graphx/dataflow.go   |   6 +-
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  |  47 +--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  | 327 +++++++++++++++------
 sdks/go/pkg/beam/core/runtime/graphx/xlang.go      |  12 +-
 sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go |  25 +-
 sdks/go/pkg/beam/core/runtime/xlangx/resolve.go    |   6 +-
 sdks/go/pkg/beam/core/runtime/xlangx/translate.go  |  36 ++-
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      |   7 +-
 sdks/go/pkg/beam/runners/universal/universal.go    |   7 +-
 12 files changed, 431 insertions(+), 178 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index c8be1d9..2c4bd43 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -76,10 +76,13 @@ func knownStandardCoders() []string {
 }
 
 // MarshalCoders marshals a list of coders into model coders.
-func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder) {
+func MarshalCoders(coders []*coder.Coder) ([]string, map[string]*pipepb.Coder, error) {
 	b := NewCoderMarshaller()
-	ids := b.AddMulti(coders)
-	return ids, b.Build()
+	if ids, err := b.AddMulti(coders); err != nil {
+		return nil, nil, err
+	} else {
+		return ids, b.Build(), nil
+	}
 }
 
 // UnmarshalCoders unmarshals coders.
@@ -160,19 +163,23 @@ func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error) {
 		return nil, err
 	}
 
-	w := urnToWindowCoder(c.GetSpec().GetUrn())
+	w, err := urnToWindowCoder(c.GetSpec().GetUrn())
+	if err != nil {
+		return nil, errors.SetTopLevelMsgf(err, "failed to unmarshal window coder %v", id)
+	}
 	b.windowCoders[id] = w
 	return w, nil
 }
 
-func urnToWindowCoder(urn string) *coder.WindowCoder {
+func urnToWindowCoder(urn string) (*coder.WindowCoder, error) {
 	switch urn {
 	case urnGlobalWindow:
-		return coder.NewGlobalWindow()
+		return coder.NewGlobalWindow(), nil
 	case urnIntervalWindow:
-		return coder.NewIntervalWindow()
+		return coder.NewIntervalWindow(), nil
 	default:
-		panic(fmt.Sprintf("Failed to translate URN to window coder, unexpected URN: %v", urn))
+		err := errors.Errorf("unexpected URN %v for window coder", urn)
+		return nil, errors.WithContext(err, "translate URN to window coder")
 	}
 }
 
@@ -422,19 +429,19 @@ func NewCoderMarshaller() *CoderMarshaller {
 }
 
 // Add adds the given coder to the set and returns its id. Idempotent.
-func (b *CoderMarshaller) Add(c *coder.Coder) string {
+func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
 	switch c.Kind {
 	case coder.Custom:
 		ref, err := encodeCustomCoder(c.Custom)
 		if err != nil {
 			typeName := c.Custom.Name
-			panic(errors.SetTopLevelMsgf(err, "Failed to encode custom coder for type %s. "+
+			return "", errors.SetTopLevelMsgf(err, "failed to encode custom coder for type %s. "+
 				"Make sure the type was registered before calling beam.Init. For example: "+
-				"beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName))
+				"beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())", typeName)
 		}
 		data, err := protox.EncodeBase64(ref)
 		if err != nil {
-			panic(errors.Wrapf(err, "Failed to marshal custom coder %v", c))
+			return "", errors.Wrapf(err, "failed to marshal custom coder %v", c)
 		}
 		inner := b.internCoder(&pipepb.Coder{
 			Spec: &pipepb.FunctionSpec{
@@ -442,15 +449,20 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 				Payload: []byte(data),
 			},
 		})
-		return b.internBuiltInCoder(urnLengthPrefixCoder, inner)
+		return b.internBuiltInCoder(urnLengthPrefixCoder, inner), nil
 
 	case coder.KV:
-		comp := b.AddMulti(c.Components)
-		return b.internBuiltInCoder(urnKVCoder, comp...)
+		comp, err := b.AddMulti(c.Components)
+		if err != nil {
+			return "", errors.Wrapf(err, "failed to marshal KV coder %v", c)
+		}
+		return b.internBuiltInCoder(urnKVCoder, comp...), nil
 
 	case coder.CoGBK:
-		comp := b.AddMulti(c.Components)
-
+		comp, err := b.AddMulti(c.Components)
+		if err != nil {
+			return "", errors.Wrapf(err, "failed to marshal CoGBK coder %v", c)
+		}
 		value := comp[1]
 		if len(comp) > 2 {
 			// TODO(BEAM-490): don't inject union coder for CoGBK.
@@ -461,62 +473,77 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 
 		// SDKs always provide iterableCoder to runners, but can receive StateBackedIterables in return.
 		stream := b.internBuiltInCoder(urnIterableCoder, value)
-		return b.internBuiltInCoder(urnKVCoder, comp[0], stream)
+		return b.internBuiltInCoder(urnKVCoder, comp[0], stream), nil
 
 	case coder.WindowedValue:
-		comp := b.AddMulti(c.Components)
-		comp = append(comp, b.AddWindowCoder(c.Window))
-		return b.internBuiltInCoder(urnWindowedValueCoder, comp...)
+		comp := []string{}
+		if ids, err := b.AddMulti(c.Components); err != nil {
+			return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+		} else {
+			comp = append(comp, ids...)
+		}
+		if id, err := b.AddWindowCoder(c.Window); err != nil {
+			return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+		} else {
+			comp = append(comp, id)
+		}
+		return b.internBuiltInCoder(urnWindowedValueCoder, comp...), nil
 
 	case coder.Bytes:
 		// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
-		return b.internBuiltInCoder(urnBytesCoder)
+		return b.internBuiltInCoder(urnBytesCoder), nil
 
 	case coder.Bool:
-		return b.internBuiltInCoder(urnBoolCoder)
+		return b.internBuiltInCoder(urnBoolCoder), nil
 
 	case coder.VarInt:
-		return b.internBuiltInCoder(urnVarIntCoder)
+		return b.internBuiltInCoder(urnVarIntCoder), nil
 
 	case coder.Double:
-		return b.internBuiltInCoder(urnDoubleCoder)
+		return b.internBuiltInCoder(urnDoubleCoder), nil
 
 	case coder.String:
-		return b.internBuiltInCoder(urnStringCoder)
+		return b.internBuiltInCoder(urnStringCoder), nil
 
 	case coder.Row:
 		rt := c.T.Type()
 		s, err := schema.FromType(rt)
 		if err != nil {
-			panic(errors.SetTopLevelMsgf(err, "Failed to convert type %v to a schema.", rt))
+			return "", errors.SetTopLevelMsgf(err, "failed to convert type %v to a schema.", rt)
 		}
-		return b.internRowCoder(s)
+		return b.internRowCoder(s), nil
 
 	// TODO(BEAM-10660): Handle coder.Timer support.
 
 	default:
-		panic(fmt.Sprintf("Failed to marshal coder %v, unexpected coder kind: %v", c, c.Kind))
+		err := errors.Errorf("unexpected coder kind: %v", c.Kind)
+		return "", errors.WithContextf(err, "failed to marshal coder %v", c)
 	}
 }
 
 // AddMulti adds the given coders to the set and returns their ids. Idempotent.
-func (b *CoderMarshaller) AddMulti(list []*coder.Coder) []string {
+func (b *CoderMarshaller) AddMulti(list []*coder.Coder) ([]string, error) {
 	var ids []string
 	for _, c := range list {
-		ids = append(ids, b.Add(c))
+		if id, err := b.Add(c); err != nil {
+			return nil, errors.Wrapf(err, "failed to marshal the coder %v.", c)
+		} else {
+			ids = append(ids, id)
+		}
 	}
-	return ids
+	return ids, nil
 }
 
 // AddWindowCoder adds a window coder.
-func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string {
+func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) (string, error) {
 	switch w.Kind {
 	case coder.GlobalWindow:
-		return b.internBuiltInCoder(urnGlobalWindow)
+		return b.internBuiltInCoder(urnGlobalWindow), nil
 	case coder.IntervalWindow:
-		return b.internBuiltInCoder(urnIntervalWindow)
+		return b.internBuiltInCoder(urnIntervalWindow), nil
 	default:
-		panic(fmt.Sprintf("Failed to add window coder %v, unexpected window kind: %v", w, w.Kind))
+		err := errors.Errorf("window coder with unexpected type %v", w.Kind)
+		return "", errors.WithContextf(err, "failed to unmarshal window coder %v", w)
 	}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index cb9dcff..c781e30 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -107,7 +107,11 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
 
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			coders, err := graphx.UnmarshalCoders(graphx.MarshalCoders([]*coder.Coder{test.c}))
+			ids, marshalCoders, err := graphx.MarshalCoders([]*coder.Coder{test.c})
+			if err != nil {
+				t.Fatalf("Marshal(%v) failed: %v", test.c, err)
+			}
+			coders, err := graphx.UnmarshalCoders(ids, marshalCoders)
 			if err != nil {
 				t.Fatalf("Unmarshal(Marshal(%v)) failed: %v", test.c, err)
 			}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
index 0ec3ba8..ed5587e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
@@ -16,13 +16,12 @@
 package graphx
 
 import (
-	"fmt"
-
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CoGBK support
@@ -70,37 +69,47 @@ const (
 )
 
 // MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		err := errors.Errorf("expected CoGBK, got %v", gbk)
+		return nil, errors.WithContext(err, "failed to make KV Union coder")
 	}
 
 	from := gbk.Input[0].From
 	key := from.Coder.Components[0]
-	return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to make KV Union coder.")
+	}
+	return coder.NewKV([]*coder.Coder{key, kvCoder}), nil
 }
 
 // MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
-func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		err := errors.Errorf("expected CoGBK, got %v", gbk)
+		return nil, errors.WithContext(err, "failed to make GBK Union coder")
 	}
 
 	from := gbk.Input[0].From
 	key := from.Coder.Components[0]
-	return coder.NewCoGBK([]*coder.Coder{key, makeUnionCoder()})
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to make GBK Union coder.")
+	}
+	return coder.NewCoGBK([]*coder.Coder{key, kvCoder}), nil
 }
 
 // makeUnionCoder returns a coder for the raw union value, KV<int,[]byte>. It uses
 // varintz instead of the built-in varint to avoid the implicit length-prefixing
 // of varint otherwise introduced by Dataflow.
-func makeUnionCoder() *coder.Coder {
+func makeUnionCoder() (*coder.Coder, error) {
 	c, err := coderx.NewVarIntZ(reflectx.Int)
 	if err != nil {
-		panic(err)
+		return nil, err
 	}
 	return coder.NewKV([]*coder.Coder{
 		{Kind: coder.Custom, T: typex.New(reflectx.Int), Custom: c},
 		coder.NewBytes(),
-	})
+	}), nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
index 30079e8..3f68e63 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -64,12 +64,12 @@ func WrapIterable(c *CoderRef) *CoderRef {
 }
 
 // WrapWindowed adds a windowed coder for Dataflow collections.
-func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) *CoderRef {
+func WrapWindowed(c *CoderRef, wc *coder.WindowCoder) (*CoderRef, error) {
 	w, err := encodeWindowCoder(wc)
 	if err != nil {
-		panic(err)
+		return nil, err
 	}
-	return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, w}, IsWrapper: true}
+	return &CoderRef{Type: windowedValueType, Components: []*CoderRef{c, w}, IsWrapper: true}, nil
 }
 
 // EncodeCoderRefs returns the encoded forms understood by the runner.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 251f222..9a9094f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -16,7 +16,6 @@
 package graphx
 
 import (
-	"fmt"
 	"reflect"
 	"strings"
 
@@ -63,7 +62,11 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1pb.MultiEdge, error) {
 	}
 
 	for _, in := range edge.Input {
-		kind := encodeInputKind(in.Kind)
+		kind, err := encodeInputKind(in.Kind)
+		if err != nil {
+			wrapped := errors.Wrap(err, "bad input type")
+			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
+		}
 		t, err := encodeFullType(in.Type)
 		if err != nil {
 			wrapped := errors.Wrap(err, "bad input type")
@@ -241,7 +244,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
 		typ, err := encodeType(t)
 		if err != nil {
 			wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv)
-			panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u))
+			return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u)
 		}
 
 		data, err := jsonx.Marshal(u.Recv)
@@ -252,7 +255,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
 		return &v1pb.Fn{Type: typ, Opt: string(data)}, nil
 
 	default:
-		panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
+		return nil, errors.Errorf("failed to encode DoFn %v, missing fn", u)
 	}
 }
 
@@ -479,7 +482,11 @@ func encodeType(t reflect.Type) (*v1pb.Type, error) {
 			wrapped := errors.Wrap(err, "bad element type")
 			return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
 		}
-		dir := encodeChanDir(t.ChanDir())
+		dir, err := encodeChanDir(t.ChanDir())
+		if err != nil {
+			wrapped := errors.Wrap(err, "bad channel direction")
+			return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
+		}
 		return &v1pb.Type{Kind: v1pb.Type_CHAN, Element: elm, ChanDir: dir}, nil
 
 	case reflect.Ptr:
@@ -725,16 +732,17 @@ func decodeInts(offsets []int32) []int {
 	return ret
 }
 
-func encodeChanDir(dir reflect.ChanDir) v1pb.Type_ChanDir {
+func encodeChanDir(dir reflect.ChanDir) (v1pb.Type_ChanDir, error) {
 	switch dir {
 	case reflect.RecvDir:
-		return v1pb.Type_RECV
+		return v1pb.Type_RECV, nil
 	case reflect.SendDir:
-		return v1pb.Type_SEND
+		return v1pb.Type_SEND, nil
 	case reflect.BothDir:
-		return v1pb.Type_BOTH
+		return v1pb.Type_BOTH, nil
 	default:
-		panic(fmt.Sprintf("Failed to encode channel direction, invalid value: %v", dir))
+		err := errors.Errorf("invalid value: %v", dir)
+		return v1pb.Type_BOTH, errors.WithContextf(err, "encoding channel direction")
 	}
 }
 
@@ -752,24 +760,25 @@ func decodeChanDir(dir v1pb.Type_ChanDir) (reflect.ChanDir, error) {
 	}
 }
 
-func encodeInputKind(k graph.InputKind) v1pb.MultiEdge_Inbound_InputKind {
+func encodeInputKind(k graph.InputKind) (v1pb.MultiEdge_Inbound_InputKind, error) {
 	switch k {
 	case graph.Main:
-		return v1pb.MultiEdge_Inbound_MAIN
+		return v1pb.MultiEdge_Inbound_MAIN, nil
 	case graph.Singleton:
-		return v1pb.MultiEdge_Inbound_SINGLETON
+		return v1pb.MultiEdge_Inbound_SINGLETON, nil
 	case graph.Slice:
-		return v1pb.MultiEdge_Inbound_SLICE
+		return v1pb.MultiEdge_Inbound_SLICE, nil
 	case graph.Map:
-		return v1pb.MultiEdge_Inbound_MAP
+		return v1pb.MultiEdge_Inbound_MAP, nil
 	case graph.MultiMap:
-		return v1pb.MultiEdge_Inbound_MULTIMAP
+		return v1pb.MultiEdge_Inbound_MULTIMAP, nil
 	case graph.Iter:
-		return v1pb.MultiEdge_Inbound_ITER
+		return v1pb.MultiEdge_Inbound_ITER, nil
 	case graph.ReIter:
-		return v1pb.MultiEdge_Inbound_REITER
+		return v1pb.MultiEdge_Inbound_REITER, nil
 	default:
-		panic(fmt.Sprintf("Failed to encode input kind, invalid value: %v", k))
+		err := errors.Errorf("invalid value: %v", k)
+		return v1pb.MultiEdge_Inbound_MAIN, errors.WithContextf(err, "encoding input kind")
 	}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d427e9f..70d39a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -77,12 +77,12 @@ func goCapabilities() []string {
 }
 
 // CreateEnvironment produces the appropriate payload for the type of environment.
-func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
+func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) {
 	var serializedPayload []byte
 	switch urn {
 	case "beam:env:process:v1":
 		// TODO Support process based SDK Harness.
-		panic(fmt.Sprintf("Unsupported environment %v", urn))
+		return nil, errors.Errorf("unsupported environment %v", urn)
 	case "beam:env:external:v1":
 		config := extractEnvironmentConfig(ctx)
 		payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
@@ -107,7 +107,7 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
 				}),
 			},
 		},
-	}
+	}, nil
 }
 
 // TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction.
@@ -124,10 +124,16 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) {
 
 	m := newMarshaller(opt)
 	for _, edge := range tree.Edges {
-		m.addMultiEdge(edge)
+		_, err := m.addMultiEdge(edge)
+		if err != nil {
+			return nil, err
+		}
 	}
 	for _, t := range tree.Children {
-		m.addScopeTree(t)
+		_, err := m.addScopeTree(t)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	p := &pipepb.Pipeline{
@@ -184,18 +190,26 @@ func (m *marshaller) getRequirements() []string {
 	return reqs
 }
 
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
 	id := scopeID(s.Scope.Scope)
 	if _, exists := m.transforms[id]; exists {
-		return id
+		return id, nil
 	}
 
 	var subtransforms []string
 	for _, edge := range s.Edges {
-		subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
+		ids, err := m.addMultiEdge(edge)
+		if err != nil {
+			return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+		}
+		subtransforms = append(subtransforms, ids...)
 	}
 	for _, tree := range s.Children {
-		subtransforms = append(subtransforms, m.addScopeTree(tree))
+		id, err := m.addScopeTree(tree)
+		if err != nil {
+			return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+		}
+		subtransforms = append(subtransforms, id)
 	}
 
 	transform := &pipepb.PTransform{
@@ -204,60 +218,89 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 		EnvironmentId: m.addDefaultEnv(),
 	}
 
-	m.updateIfCombineComposite(s, transform)
+	if err := m.updateIfCombineComposite(s, transform); err != nil {
+		return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
+	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
 // updateIfCombineComposite examines the scope tree and sets the PTransform Spec
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) error {
 	if s.Scope.Name != graph.CombinePerKeyScope ||
 		len(s.Edges) != 2 ||
 		len(s.Edges[0].Edge.Input) != 1 ||
 		len(s.Edges[1].Edge.Output) != 1 ||
 		s.Edges[1].Edge.Op != graph.Combine {
-		return
+		return nil
 	}
 
 	edge := s.Edges[1].Edge
-	acID := m.coders.Add(edge.AccumCoder)
+	acID, err := m.coders.Add(edge.AccumCoder)
+	if err != nil {
+		return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
+	}
+	mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge)
+	if err != nil {
+		return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
+	}
 	payload := &pipepb.CombinePayload{
 		CombineFn: &pipepb.FunctionSpec{
 			Urn:     URNDoFn,
-			Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
+			Payload: []byte(mustEncodeMultiEdge),
 		},
 		AccumulatorCoderId: acID,
 	}
 	transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
+	return nil
 }
 
-func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
+	handleErr := func(err error) ([]string, error) {
+		return nil, errors.Wrapf(err, "failed to add input kind: %v", edge)
+	}
 	id := edgeID(edge.Edge)
 	if _, exists := m.transforms[id]; exists {
-		return []string{id}
+		return []string{id}, nil
 	}
 
 	switch {
 	case edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1:
-		return []string{m.expandCoGBK(edge)}
+		cogbkID, err := m.expandCoGBK(edge)
+		if err != nil {
+			return handleErr(err)
+		}
+		return []string{cogbkID}, nil
 	case edge.Edge.Op == graph.Reshuffle:
-		return []string{m.expandReshuffle(edge)}
+		reshuffleID, err := m.expandReshuffle(edge)
+		if err != nil {
+			return handleErr(err)
+		}
+		return []string{reshuffleID}, nil
 	case edge.Edge.Op == graph.External && edge.Edge.Payload == nil:
-		return []string{m.expandCrossLanguage(edge)}
+		edgeID, err := m.expandCrossLanguage(edge)
+		if err != nil {
+			return handleErr(err)
+		}
+		return []string{edgeID}, nil
 	}
 
 	inputs := make(map[string]string)
 	for i, in := range edge.Edge.Input {
-		m.addNode(in.From)
+		if _, err := m.addNode(in.From); err != nil {
+			return handleErr(err)
+		}
 		inputs[fmt.Sprintf("i%v", i)] = nodeID(in.From)
 	}
 	outputs := make(map[string]string)
 	for i, out := range edge.Edge.Output {
-		m.addNode(out.To)
+		if _, err := m.addNode(out.To); err != nil {
+			return handleErr(err)
+		}
 		outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
 	}
 
@@ -281,7 +324,13 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 				// "", even if the input is already KV.
 
 				out := fmt.Sprintf("%v_keyed%v_%v", nodeID(in.From), edgeID(edge.Edge), i)
-				m.makeNode(out, m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
+				coderId, err := m.coders.Add(makeBytesKeyedCoder(in.From.Coder))
+				if err != nil {
+					return handleErr(err)
+				}
+				if _, err := m.makeNode(out, coderId, in.From); err != nil {
+					return handleErr(err)
+				}
 
 				payload := &pipepb.ParDoPayload{
 					DoFn: &pipepb.FunctionSpec{
@@ -322,31 +371,44 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 				}
 
 			case graph.Map, graph.MultiMap:
-				panic("NYI")
+				return nil, errors.Errorf("not implemented")
 
 			default:
-				panic(fmt.Sprintf("unexpected input kind: %v", edge))
+				return nil, errors.Errorf("unexpected input kind: %v", edge)
 			}
 		}
 
+		mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+		if err != nil {
+			return handleErr(err)
+		}
+
 		payload := &pipepb.ParDoPayload{
 			DoFn: &pipepb.FunctionSpec{
 				Urn:     URNDoFn,
-				Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+				Payload: []byte(mustEncodeMultiEdge),
 			},
 			SideInputs: si,
 		}
 		if edge.Edge.DoFn.IsSplittable() {
-			payload.RestrictionCoderId = m.coders.Add(edge.Edge.RestrictionCoder)
+			coderId, err := m.coders.Add(edge.Edge.RestrictionCoder)
+			if err != nil {
+				return handleErr(err)
+			}
+			payload.RestrictionCoderId = coderId
 			m.requirements[URNRequiresSplittableDoFn] = true
 		}
 		spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
 
 	case graph.Combine:
+		mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge.Edge)
+		if err != nil {
+			return handleErr(err)
+		}
 		payload := &pipepb.ParDoPayload{
 			DoFn: &pipepb.FunctionSpec{
 				Urn:     URNDoFn,
-				Payload: []byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+				Payload: []byte(mustEncodeMultiEdge),
 			},
 		}
 		spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
@@ -358,8 +420,12 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 		spec = &pipepb.FunctionSpec{Urn: URNGBK}
 
 	case graph.WindowInto:
+		windowFn, err := makeWindowFn(edge.Edge.WindowFn)
+		if err != nil {
+			return handleErr(err)
+		}
 		payload := &pipepb.WindowIntoPayload{
-			WindowFn: makeWindowFn(edge.Edge.WindowFn),
+			WindowFn: windowFn,
 		}
 		spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}
 
@@ -367,7 +433,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 		spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: edge.Edge.Payload.Data}
 
 	default:
-		panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
+		err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
+		return handleErr(err)
 	}
 
 	var transformEnvID = ""
@@ -384,17 +451,19 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 	}
 	m.transforms[id] = transform
 	allPIds = append(allPIds, id)
-	return allPIds
+	return allPIds, nil
 }
 
-func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
+func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) (string, error) {
 	edge := namedEdge.Edge
 	id := edgeID(edge)
 
 	inputs := make(map[string]string)
 
 	for tag, n := range ExternalInputs(edge) {
-		m.addNode(n)
+		if _, err := m.addNode(n); err != nil {
+			return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+		}
 		// Ignore tag if it is a dummy SourceInputTag
 		if tag == graph.SourceInputTag {
 			tag = fmt.Sprintf("i%v", edge.External.InputsMap[tag])
@@ -420,33 +489,61 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
 		// map consumers of these outputs to the expanded transform's outputs.
 		outputs := make(map[string]string)
 		for i, out := range edge.Output {
-			m.addNode(out.To)
+			if _, err := m.addNode(out.To); err != nil {
+				return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+			}
 			outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
 		}
 		transform.Outputs = outputs
-		transform.EnvironmentId = ExpandedTransform(edge.External.Expanded).EnvironmentId
+		environment, err := ExpandedTransform(edge.External.Expanded)
+		if err != nil {
+			return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
+		}
+		transform.EnvironmentId = environment.EnvironmentId
 	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
 	// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have to translate
 	// CoGBK with multiple PCollections as described in cogbk.go.
+	handleErr := func(err error) (string, error) {
+		return "", errors.Wrapf(err, "failed to expand CoGBK transform for edge: %v", edge)
+	}
 
 	id := edgeID(edge.Edge)
-	kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
-	gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+	kvCoder, err := MakeKVUnionCoder(edge.Edge)
+	if err != nil {
+		return handleErr(err)
+	}
+	kvCoderID, err := m.coders.Add(kvCoder)
+	if err != nil {
+		return handleErr(err)
+	}
+	gbkCoder, err := MakeGBKUnionCoder(edge.Edge)
+	if err != nil {
+		return handleErr(err)
+	}
+	gbkCoderID, err := m.coders.Add(gbkCoder)
+	if err != nil {
+		return handleErr(err)
+	}
 
 	var subtransforms []string
 
 	inputs := make(map[string]string)
 	for i, in := range edge.Edge.Input {
-		m.addNode(in.From)
+
+		if _, err := m.addNode(in.From); err != nil {
+			return handleErr(err)
+		}
 
 		out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
-		m.makeNode(out, kvCoderID, in.From)
+		if _, err := m.makeNode(out, kvCoderID, in.From); err != nil {
+			return handleErr(err)
+		}
 
 		// Inject(i)
 
@@ -481,7 +578,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 	// Flatten
 
 	out := fmt.Sprintf("%v_flatten", nodeID(outNode))
-	m.makeNode(out, kvCoderID, outNode)
+	if _, err := m.makeNode(out, kvCoderID, outNode); err != nil {
+		return handleErr(err)
+	}
 
 	flattenID := fmt.Sprintf("%v_flatten", id)
 	flatten := &pipepb.PTransform{
@@ -497,7 +596,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 	// CoGBK
 
 	gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
-	m.makeNode(gbkOut, gbkCoderID, outNode)
+	if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+		return handleErr(err)
+	}
 
 	gbkID := fmt.Sprintf("%v_gbk", id)
 	gbk := &pipepb.PTransform{
@@ -511,7 +612,9 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 
 	// Expand
 
-	m.addNode(outNode)
+	if _, err := m.addNode(outNode); err != nil {
+		return handleErr(err)
+	}
 
 	expandID := fmt.Sprintf("%v_expand", id)
 	payload := &pipepb.ParDoPayload{
@@ -543,7 +646,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 		Subtransforms: subtransforms,
 		EnvironmentId: m.addDefaultEnv(),
 	}
-	return cogbkID
+	return cogbkID, nil
 }
 
 // expandReshuffle translates resharding to a composite reshuffle
@@ -569,36 +672,62 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 // User code is able to write reshards, but it's easier to access
 // the window coders framework side, which is critical for the reshard
 // to function with unbounded inputs.
-func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
+	handleErr := func(err error) (string, error) {
+		return "", errors.Wrapf(err, "failed to expand Reshuffle transform for edge: %v", edge)
+	}
 	id := edgeID(edge.Edge)
-	var kvCoderID, gbkCoderID string
-	{
-		kv := makeUnionCoder()
-		kvCoderID = m.coders.Add(kv)
-		gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return handleErr(err)
+	}
+	kvCoderID, err := m.coders.Add(kvCoder)
+	if err != nil {
+		return handleErr(err)
+	}
+	gbkCoderID, err := m.coders.Add(coder.NewCoGBK(kvCoder.Components))
+	if err != nil {
+		return handleErr(err)
 	}
 
 	var subtransforms []string
 
 	in := edge.Edge.Input[0]
 
-	origInput := m.addNode(in.From)
+	origInput, err := m.addNode(in.From)
+	if err != nil {
+		return handleErr(err)
+	}
 	// We need to preserve the old windowing/triggering here
 	// for re-instatement after the GBK.
 	preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
 
 	// Get the windowing strategy from before:
 	postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
-	m.makeNode(postReify, kvCoderID, in.From)
+	if _, err := m.makeNode(postReify, kvCoderID, in.From); err != nil {
+		return handleErr(err)
+	}
 
 	// We need to replace postReify's windowing strategy with one appropriate
 	// for reshuffles.
 	{
 		wfn := window.NewGlobalWindows()
+		windowFn, err := makeWindowFn(wfn)
+		if err != nil {
+			return handleErr(err)
+		}
+		coderId, err := makeWindowCoder(wfn)
+		if err != nil {
+			return handleErr(err)
+		}
+		windowCoderId, err := m.coders.AddWindowCoder(coderId)
+		if err != nil {
+			return handleErr(err)
+		}
 		m.pcollections[postReify].WindowingStrategyId =
 			m.internWindowingStrategy(&pipepb.WindowingStrategy{
 				// Not segregated by time...
-				WindowFn: makeWindowFn(wfn),
+				WindowFn: windowFn,
 				// ...output after every element is received...
 				Trigger: &pipepb.Trigger{
 					Trigger: &pipepb.Trigger_Always_{
@@ -614,7 +743,7 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
 				// TODO(BEAM-3304): migrate to user side operations once trigger support is in.
 				EnvironmentId:   m.addDefaultEnv(),
 				MergeStatus:     pipepb.MergeStatus_NON_MERGING,
-				WindowCoderId:   m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+				WindowCoderId:   windowCoderId,
 				ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
 				AllowedLateness: 0,
 				OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
@@ -650,7 +779,9 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
 	// GBK
 
 	gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
-	m.makeNode(gbkOut, gbkCoderID, outNode)
+	if _, err := m.makeNode(gbkOut, gbkCoderID, outNode); err != nil {
+		return handleErr(err)
+	}
 
 	gbkID := fmt.Sprintf("%v_gbk", id)
 	gbk := &pipepb.PTransform{
@@ -664,7 +795,10 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
 
 	// Expand
 
-	outPCol := m.addNode(outNode)
+	outPCol, err := m.addNode(outNode)
+	if err != nil {
+		return handleErr(err)
+	}
 	m.pcollections[outPCol].WindowingStrategyId = preservedWSId
 
 	outputID := fmt.Sprintf("%v_unreify", id)
@@ -699,27 +833,37 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
 		},
 		EnvironmentId: m.addDefaultEnv(),
 	}
-	return reshuffleID
+	return reshuffleID, nil
 }
 
-func (m *marshaller) addNode(n *graph.Node) string {
+func (m *marshaller) addNode(n *graph.Node) (string, error) {
 	id := nodeID(n)
 	if _, exists := m.pcollections[id]; exists {
-		return id
+		return id, nil
 	}
 	// TODO(herohde) 11/15/2017: expose UniqueName to user.
-	return m.makeNode(id, m.coders.Add(n.Coder), n)
+	cid, err := m.coders.Add(n.Coder)
+	if err != nil {
+		return "", err
+	}
+	return m.makeNode(id, cid, n)
 }
 
-func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
+func (m *marshaller) makeNode(id, cid string, n *graph.Node) (string, error) {
+	windowingStrategyId, err := m.addWindowingStrategy(n.WindowingStrategy())
+
+	if err != nil {
+		return "", errors.Wrapf(err, "failed to make node %v with node id %v", n, id)
+	}
+
 	col := &pipepb.PCollection{
 		UniqueName:          id,
 		CoderId:             cid,
 		IsBounded:           boolToBounded(n.Bounded()),
-		WindowingStrategyId: m.addWindowingStrategy(n.WindowingStrategy()),
+		WindowingStrategyId: windowingStrategyId,
 	}
 	m.pcollections[id] = col
-	return id
+	return id, nil
 }
 
 func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
@@ -737,10 +881,13 @@ func (m *marshaller) addDefaultEnv() string {
 	return id
 }
 
-func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
-	ws := marshalWindowingStrategy(m.coders, w)
+func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) (string, error) {
+	ws, err := marshalWindowingStrategy(m.coders, w)
+	if err != nil {
+		return "", errors.Wrapf(err, "failed to add window strategy %v", w)
+	}
 	ws.EnvironmentId = m.addDefaultEnv()
-	return m.internWindowingStrategy(ws)
+	return m.internWindowingStrategy(ws), nil
 }
 
 func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) string {
@@ -757,12 +904,24 @@ func (m *marshaller) internWindowingStrategy(w *pipepb.WindowingStrategy) string
 
 // marshalWindowingStrategy marshals the given windowing strategy in
 // the given coder context.
-func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *pipepb.WindowingStrategy {
+func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) (*pipepb.WindowingStrategy, error) {
+	windowFn, err := makeWindowFn(w.Fn)
+	if err != nil {
+		return nil, err
+	}
+	coderId, err := makeWindowCoder(w.Fn)
+	if err != nil {
+		return nil, err
+	}
+	windowCoderId, err := c.AddWindowCoder(coderId)
+	if err != nil {
+		return nil, err
+	}
 	ws := &pipepb.WindowingStrategy{
-		WindowFn:         makeWindowFn(w.Fn),
+		WindowFn:         windowFn,
 		MergeStatus:      pipepb.MergeStatus_NON_MERGING,
 		AccumulationMode: pipepb.AccumulationMode_DISCARDING,
-		WindowCoderId:    c.AddWindowCoder(makeWindowCoder(w.Fn)),
+		WindowCoderId:    windowCoderId,
 		Trigger: &pipepb.Trigger{
 			Trigger: &pipepb.Trigger_Default_{
 				Default: &pipepb.Trigger_Default{},
@@ -773,15 +932,15 @@ func marshalWindowingStrategy(c *CoderMarshaller, w *window.WindowingStrategy) *
 		AllowedLateness: 0,
 		OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
 	}
-	return ws
+	return ws, nil
 }
 
-func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
+func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
 	switch w.Kind {
 	case window.GlobalWindows:
 		return &pipepb.FunctionSpec{
 			Urn: URNGlobalWindowsWindowFn,
-		}
+		}, nil
 	case window.FixedWindows:
 		return &pipepb.FunctionSpec{
 			Urn: URNFixedWindowsWindowFn,
@@ -790,7 +949,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					Size: ptypes.DurationProto(w.Size),
 				},
 			),
-		}
+		}, nil
 	case window.SlidingWindows:
 		return &pipepb.FunctionSpec{
 			Urn: URNSlidingWindowsWindowFn,
@@ -800,7 +959,7 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					Period: ptypes.DurationProto(w.Period),
 				},
 			),
-		}
+		}, nil
 	case window.Sessions:
 		return &pipepb.FunctionSpec{
 			Urn: URNSessionsWindowFn,
@@ -809,32 +968,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					GapSize: ptypes.DurationProto(w.Gap),
 				},
 			),
-		}
+		}, nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("unexpected windowing strategy: %v", w)
 	}
 }
 
-func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
+func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) {
 	switch w.Kind {
 	case window.GlobalWindows:
-		return coder.NewGlobalWindow()
+		return coder.NewGlobalWindow(), nil
 	case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn:
-		return coder.NewIntervalWindow()
+		return coder.NewIntervalWindow(), nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("unexpected windowing strategy: %v", w)
 	}
 }
 
-func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
+func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) (string, error) {
 	ref, err := EncodeMultiEdge(edge)
 	if err != nil {
-		panic(errors.Wrapf(err, "Failed to serialize %v", edge))
+		return "", errors.Wrapf(err, "failed to serialize %v", edge)
 	}
 	return protox.MustEncodeBase64(&v1pb.TransformPayload{
 		Urn:  URNDoFn,
 		Edge: ref,
-	})
+	}), nil
 }
 
 // makeBytesKeyedCoder returns KV<[]byte,A,> for any coder,
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
index cc1bb71..3311e46 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang.go
@@ -23,20 +23,20 @@ import (
 
 // ExpandedComponents type asserts the Components field with interface{} type
 // and returns its pipeline component proto representation
-func ExpandedComponents(exp *graph.ExpandedTransform) *pipepb.Components {
+func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, error) {
 	if c, ok := exp.Components.(*pipepb.Components); ok {
-		return c
+		return c, nil
 	}
-	panic(errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp))
+	return nil, errors.Errorf("malformed components; %v lacks a conforming pipeline component", exp)
 }
 
 // ExpandedTransform type asserts the Transform field with interface{} type
 // and returns its pipeline ptransform proto representation
-func ExpandedTransform(exp *graph.ExpandedTransform) *pipepb.PTransform {
+func ExpandedTransform(exp *graph.ExpandedTransform) (*pipepb.PTransform, error) {
 	if t, ok := exp.Transform.(*pipepb.PTransform); ok {
-		return t
+		return t, nil
 	}
-	panic(errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp))
+	return nil, errors.Errorf("malformed transform; %v lacks a conforming pipeline ptransform", exp)
 }
 
 // ExternalInputs returns the map (tag -> graph node representing the
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
index 98e415b..a71916e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
@@ -156,7 +156,10 @@ func TestExpandedTransform(t *testing.T) {
 		want := newTransform("x")
 		exp := &graph.ExpandedTransform{Transform: want}
 
-		got := ExpandedTransform(exp)
+		got, err := ExpandedTransform(exp)
+		if err != nil {
+			t.Fatal(err)
+		}
 
 		if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
 			t.Errorf("diff (-want, +got): %v", d)
@@ -165,9 +168,12 @@ func TestExpandedTransform(t *testing.T) {
 	})
 
 	t.Run("Malformed PTransform", func(t *testing.T) {
-		defer expectPanic(t, "string can't be type asserted into a pipeline PTransform")
 		exp := &graph.ExpandedTransform{Transform: "gibberish"}
-		ExpandedTransform(exp)
+		expectedError := fmt.Sprintf("malformed transform; %v lacks a conforming pipeline ptransform", exp)
+		if _, actualError := ExpandedTransform(exp); actualError.Error() != expectedError {
+			t.Errorf("got error %v, want error %v", actualError, expectedError)
+		}
+
 	})
 }
 
@@ -176,7 +182,11 @@ func TestExpandedComponents(t *testing.T) {
 		want := newComponents([]string{"x"})
 		exp := &graph.ExpandedTransform{Components: want}
 
-		got := ExpandedComponents(exp)
+		got, err := ExpandedComponents(exp)
+
+		if err != nil {
+			t.Fatal(err)
+		}
 
 		if d := cmp.Diff(want, got, protocmp.Transform()); d != "" {
 			t.Errorf("diff (-want, +got): %v", d)
@@ -185,8 +195,11 @@ func TestExpandedComponents(t *testing.T) {
 	})
 
 	t.Run("Malformed Components", func(t *testing.T) {
-		defer expectPanic(t, "string can't be type asserted into a pipeline Components")
 		exp := &graph.ExpandedTransform{Transform: "gibberish"}
-		ExpandedComponents(exp)
+		expectedError := fmt.Sprintf("malformed components; %v lacks a conforming pipeline component", exp)
+		if _, actualError := ExpandedComponents(exp); actualError.Error() != expectedError {
+			t.Errorf("got error %v, want error %v", actualError, expectedError)
+		}
+
 	})
 }
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index ec3c552..87dba0c 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -35,7 +35,11 @@ func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.P
 	}
 	for _, e := range edges {
 		if e.Op == graph.External {
-			envs := graphx.ExpandedComponents(e.External.Expanded).Environments
+			components, err := graphx.ExpandedComponents(e.External.Expanded)
+			if err != nil {
+				panic(err)
+			}
+			envs := components.Environments
 			for eid, env := range envs {
 
 				if strings.HasPrefix(eid, "go") {
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
index 5a456dc..fb5abef 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/translate.go
@@ -36,7 +36,10 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 			p.Requirements = append(p.Requirements, exp.Requirements...)
 
 			// Adding components of the Expanded Transforms to the current Pipeline
-			components := graphx.ExpandedComponents(exp)
+			components, err := graphx.ExpandedComponents(exp)
+			if err != nil {
+				panic(err)
+			}
 			for k, v := range components.GetTransforms() {
 				p.Components.Transforms[k] = v
 			}
@@ -60,8 +63,11 @@ func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 				p.Components.Environments[k] = v
 			}
 
-			p.Components.Transforms[id] = graphx.ExpandedTransform(exp)
-
+			transform, err := graphx.ExpandedTransform(exp)
+			if err != nil {
+				panic(err)
+			}
+			p.Components.Transforms[id] = transform
 		}
 	}
 }
@@ -79,7 +85,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 			for tag, n := range graphx.ExternalOutputs(e) {
 				nodeID := fmt.Sprintf("n%v", n.ID())
 
-				expandedOutputs := graphx.ExpandedTransform(e.External.Expanded).GetOutputs()
+				transform, err := graphx.ExpandedTransform(e.External.Expanded)
+				if err != nil {
+					panic(err)
+				}
+				expandedOutputs := transform.GetOutputs()
 				var pcolID string
 				if tag == graph.SinkOutputTag {
 					for _, pcolID = range expandedOutputs {
@@ -109,7 +119,11 @@ func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
 
 // VerifyNamedOutputs ensures the expanded outputs correspond to the correct and expected named outputs
 func VerifyNamedOutputs(ext *graph.ExternalTransform) {
-	expandedOutputs := graphx.ExpandedTransform(ext.Expanded).GetOutputs()
+	transform, err := graphx.ExpandedTransform(ext.Expanded)
+	if err != nil {
+		panic(err)
+	}
+	expandedOutputs := transform.GetOutputs()
 
 	if len(expandedOutputs) != len(ext.OutputsMap) {
 		panic(errors.Errorf("mismatched number of named outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), len(ext.OutputsMap)))
@@ -131,8 +145,16 @@ func VerifyNamedOutputs(ext *graph.ExternalTransform) {
 func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater func(*graph.Node, bool)) {
 	ext := e.External
 	exp := ext.Expanded
-	expandedPCollections := graphx.ExpandedComponents(exp).GetPcollections()
-	expandedOutputs := graphx.ExpandedTransform(exp).GetOutputs()
+	components, err := graphx.ExpandedComponents(exp)
+	if err != nil {
+		panic(err)
+	}
+	expandedPCollections := components.GetPcollections()
+	transform, err := graphx.ExpandedTransform(exp)
+	if err != nil {
+		panic(err)
+	}
+	expandedOutputs := transform.GetOutputs()
 
 	for tag, node := range graphx.ExternalOutputs(e) {
 		var id string
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index f739da7..d830cc5 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -171,8 +171,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	if err != nil {
 		return err
 	}
-	model, err := graphx.Marshal(edges, &graphx.Options{Environment: graphx.CreateEnvironment(
-		ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)})
+	enviroment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+	if err != nil {
+		return errors.WithContext(err, "generating model pipeline")
+	}
+	model, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment})
 	if err != nil {
 		return errors.WithContext(err, "generating model pipeline")
 	}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index 940f852..fac57b7 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -77,8 +77,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		getEnvCfg = srv.EnvironmentConfig
 	}
 
-	pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: graphx.CreateEnvironment(
-		ctx, envUrn, getEnvCfg)})
+	enviroment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
+	if err != nil {
+		return errors.WithContextf(err, "generating model pipeline")
+	}
+	pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: enviroment})
 	if err != nil {
 		return errors.WithContextf(err, "generating model pipeline")
 	}