You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/03/21 22:14:53 UTC

[beam] branch master updated: [BEAM-6838] Go SDK: Improving error msgs in pipeline serialization.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b82483  [BEAM-6838] Go SDK: Improving error msgs in pipeline serialization.
     new f45974b  Merge pull request #8064 from youngoli/beam-6838
1b82483 is described below

commit 1b8248357be59590dc0cfc6bfeef6ce7490dcc88
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Thu Mar 14 16:29:08 2019 -0700

    [BEAM-6838] Go SDK: Improving error msgs in pipeline serialization.
---
 sdks/go/pkg/beam/core/runtime/graphx/coder.go     |  30 +++---
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 108 +++++++++++-----------
 2 files changed, 71 insertions(+), 67 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 6b3f7dd..43458f9 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -140,7 +140,7 @@ func urnToWindowCoder(urn string) *coder.WindowCoder {
 	case urnIntervalWindow:
 		return coder.NewIntervalWindow()
 	default:
-		panic(fmt.Sprintf("Unexpected window coder: %v", urn))
+		panic(fmt.Sprintf("Failed to translate URN to window coder, unexpected URN: %v", urn))
 	}
 }
 
@@ -157,7 +157,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnKVCoder:
 		if len(components) != 2 {
-			return nil, fmt.Errorf("bad pair: %v", c)
+			return nil, fmt.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
 		}
 
 		key, err := b.Coder(components[0])
@@ -206,7 +206,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnLengthPrefixCoder:
 		if len(components) != 1 {
-			return nil, fmt.Errorf("bad length prefix: %v", c)
+			return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components))
 		}
 
 		elm, err := b.peek(components[0])
@@ -217,7 +217,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 		// the portable pipeline model directly (BEAM-2885)
 		if elm.GetSpec().GetSpec().GetUrn() != "" && elm.GetSpec().GetSpec().GetUrn() != urnCustomCoder {
 			// TODO(herohde) 11/17/2017: revisit this restriction
-			return nil, fmt.Errorf("expected length prefix of custom coder only: %v", elm)
+			return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a custom coder as a sub component but got %v", c, elm)
 		}
 
 		var ref v1.CustomCoder
@@ -233,7 +233,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnWindowedValueCoder:
 		if len(components) != 2 {
-			return nil, fmt.Errorf("bad windowed value: %v", c)
+			return nil, fmt.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components))
 		}
 
 		elm, err := b.Coder(components[0])
@@ -248,7 +248,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 		return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil
 
 	case streamType:
-		return nil, fmt.Errorf("stream must be pair value: %v", c)
+		return nil, fmt.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c)
 
 	case "":
 		// TODO(herohde) 11/27/2017: we still see CoderRefs from Dataflow. Handle that
@@ -258,16 +258,16 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 		var ref CoderRef
 		if err := json.Unmarshal(payload, &ref); err != nil {
-			return nil, fmt.Errorf("failed to decode urn-less coder payload \"%v\": %v", string(payload), err)
+			return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\": %v", c, string(payload), err)
 		}
 		c, err := DecodeCoderRef(&ref)
 		if err != nil {
-			return nil, fmt.Errorf("failed to translate coder \"%v\": %v", string(payload), err)
+			return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\": %v", c, string(payload), err)
 		}
 		return c, nil
 
 	default:
-		return nil, fmt.Errorf("custom coders must be length prefixed: %v", c)
+		return nil, fmt.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn)
 	}
 }
 
@@ -318,11 +318,15 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 	case coder.Custom:
 		ref, err := encodeCustomCoder(c.Custom)
 		if err != nil {
-			panic(fmt.Sprintf("failed to encode custom coder: %v", err))
+			typeName := c.Custom.Name
+			panic(fmt.Sprintf("Failed to encode custom coder for type %s. "+
+				"Make sure the type was registered before calling beam.Init. For example: "+
+				"beam.RegisterType(reflect.TypeOf((*TypeName)(nil)).Elem())\n\n"+
+				"Full error: %v", typeName, err))
 		}
 		data, err := protox.EncodeBase64(ref)
 		if err != nil {
-			panic(fmt.Sprintf("failed to marshal custom coder: %v", err))
+			panic(fmt.Sprintf("Failed to marshal custom coder %v: %v", c, err))
 		}
 		inner := b.internCoder(&pb.Coder{
 			Spec: &pb.SdkFunctionSpec{
@@ -366,7 +370,7 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 		return b.internBuiltInCoder(urnVarIntCoder)
 
 	default:
-		panic(fmt.Sprintf("Unexpected coder kind: %v", c.Kind))
+		panic(fmt.Sprintf("Failed to marshal custom coder %v, unexpected coder kind: %v", c, c.Kind))
 	}
 }
 
@@ -387,7 +391,7 @@ func (b *CoderMarshaller) AddWindowCoder(w *coder.WindowCoder) string {
 	case coder.IntervalWindow:
 		return b.internBuiltInCoder(urnIntervalWindow)
 	default:
-		panic(fmt.Sprintf("Unexpected window kind: %v", w.Kind))
+		panic(fmt.Sprintf("Failed to add window coder %v, unexpected window kind: %v", w, w.Kind))
 	}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index d893a93..5f0e944 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -43,14 +43,14 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
 	if edge.DoFn != nil {
 		ref, err := encodeFn((*graph.Fn)(edge.DoFn))
 		if err != nil {
-			return nil, fmt.Errorf("encode: bad userfn: %v", err)
+			return nil, fmt.Errorf("failed to encode userfn %v, bad userfn: %v", edge, err)
 		}
 		ret.Fn = ref
 	}
 	if edge.CombineFn != nil {
 		ref, err := encodeFn((*graph.Fn)(edge.CombineFn))
 		if err != nil {
-			return nil, fmt.Errorf("encode: bad combinefn: %v", err)
+			return nil, fmt.Errorf("failed to encode userfn %v, bad combinefn: %v", edge, err)
 		}
 		ret.Fn = ref
 	}
@@ -62,14 +62,14 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
 		kind := encodeInputKind(in.Kind)
 		t, err := encodeFullType(in.Type)
 		if err != nil {
-			return nil, fmt.Errorf("encode: bad input type: %v", err)
+			return nil, fmt.Errorf("failed to encode userfn %v, bad input type: %v", edge, err)
 		}
 		ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: kind, Type: t})
 	}
 	for _, out := range edge.Output {
 		t, err := encodeFullType(out.Type)
 		if err != nil {
-			return nil, fmt.Errorf("encode: bad output type: %v", err)
+			return nil, fmt.Errorf("failed to encode userfn %v, bad output type: %v", edge, err)
 		}
 		ret.Outbound = append(ret.Outbound, &v1.MultiEdge_Outbound{Type: t})
 	}
@@ -91,7 +91,7 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 		var err error
 		u, err = decodeFn(edge.Fn)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("decode: bad userfn: %v", err)
+			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad function: %v", edge, err)
 		}
 	}
 	if edge.WindowFn != nil {
@@ -100,18 +100,18 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 	for _, in := range edge.Inbound {
 		kind, err := decodeInputKind(in.Kind)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("decode: bad input kind: %v", err)
+			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input kind: %v", edge, err)
 		}
 		t, err := decodeFullType(in.Type)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("decode: bad input type: %v", err)
+			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input type: %v", edge, err)
 		}
 		inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t})
 	}
 	for _, out := range edge.Outbound {
 		t, err := decodeFullType(out.Type)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("decode: bad output type: %v", err)
+			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad output type: %v", edge, err)
 		}
 		outbound = append(outbound, &graph.Outbound{Type: t})
 	}
@@ -122,15 +122,15 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
 	t, err := encodeType(c.Type)
 	if err != nil {
-		return nil, fmt.Errorf("bad underlying type: %v", err)
+		return nil, fmt.Errorf("failed to encode custom coder %v for type %v: %v", c, c.Type, err)
 	}
 	enc, err := encodeUserFn(c.Enc)
 	if err != nil {
-		return nil, fmt.Errorf("bad enc: %v", err)
+		return nil, fmt.Errorf("failed to encode custom coder %v, bad encoding function: %v", c, err)
 	}
 	dec, err := encodeUserFn(c.Dec)
 	if err != nil {
-		return nil, fmt.Errorf("bad dec: %v", err)
+		return nil, fmt.Errorf("failed to encode custom coder %v, bad decoding function: %v", c, err)
 	}
 
 	ret := &v1.CustomCoder{
@@ -145,20 +145,20 @@ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
 func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
 	t, err := decodeType(c.Type)
 	if err != nil {
-		return nil, fmt.Errorf("decodeCustomCoder bad type: %v", err)
+		return nil, fmt.Errorf("failed to decode custom coder %v for type %v: %v", c, c.Type, err)
 	}
 	enc, err := decodeUserFn(c.Enc)
 	if err != nil {
-		return nil, fmt.Errorf("decodeCustomCoder bad encoder: %v", err)
+		return nil, fmt.Errorf("failed to decode custom coder %v, bad encoding function: %v", c, err)
 	}
 	dec, err := decodeUserFn(c.Dec)
 	if err != nil {
-		return nil, fmt.Errorf("decodeCustomCoder bad decoder: %v", err)
+		return nil, fmt.Errorf("failed to decode custom coder %v, bad decoding function: %v", c, err)
 	}
 
 	ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
 	if err != nil {
-		return nil, fmt.Errorf("decodeCustomCoder: %v", err)
+		return nil, fmt.Errorf("failed to decode custom coder %v: %v", c, err)
 	}
 	return ret, nil
 }
@@ -195,7 +195,7 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 		gen := reflectx.FunctionName(u.DynFn.Gen)
 		t, err := encodeType(u.DynFn.T)
 		if err != nil {
-			return nil, fmt.Errorf("bad function type: %v", err)
+			return nil, fmt.Errorf("failed to encode dynamic DoFn %v, bad function type: %v", u, err)
 		}
 		return &v1.Fn{Dynfn: &v1.DynFn{
 			Name: u.DynFn.Name,
@@ -207,7 +207,7 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 	case u.Fn != nil:
 		fn, err := encodeUserFn(u.Fn)
 		if err != nil {
-			return nil, fmt.Errorf("bad userfn: %v", err)
+			return nil, fmt.Errorf("failed to encode DoFn %v, bad userfn: %v", u, err)
 		}
 		return &v1.Fn{Fn: fn}, nil
 
@@ -215,24 +215,24 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 		t := reflect.TypeOf(u.Recv)
 		k, ok := runtime.TypeKey(reflectx.SkipPtr(t))
 		if !ok {
-			return nil, fmt.Errorf("bad recv: %v", u.Recv)
+			return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to create TypeKey for receiver type %T", u, u.Recv)
 		}
 		if _, ok := runtime.LookupType(k); !ok {
-			return nil, fmt.Errorf("recv type must be registered: %v", t)
+			return nil, fmt.Errorf("failed to encode structural DoFn %v, receiver type %v must be registered", u, t)
 		}
 		typ, err := encodeType(t)
 		if err != nil {
-			panic(fmt.Sprintf("bad recv type: %v", u.Recv))
+			panic(fmt.Sprintf("Failed to encode structural DoFn %v, failed to encode receiver type %T: %v", u, u.Recv, err))
 		}
 
 		data, err := json.Marshal(u.Recv)
 		if err != nil {
-			return nil, fmt.Errorf("bad userfn: %v", err)
+			return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to marshal receiver %v: %v", u, u.Recv, err)
 		}
 		return &v1.Fn{Type: typ, Opt: string(data)}, nil
 
 	default:
-		panic("empty Fn")
+		panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
 	}
 }
 
@@ -240,12 +240,12 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 	if u.Dynfn != nil {
 		gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType)
 		if err != nil {
-			return nil, fmt.Errorf("bad symbol %v: %v", u.Dynfn.Gen, err)
+			return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad symbol %v: %v", u, u.Dynfn.Gen, err)
 		}
 
 		t, err := decodeType(u.Dynfn.Type)
 		if err != nil {
-			return nil, fmt.Errorf("bad type: %v", err)
+			return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad type: %v", u, err)
 		}
 		return graph.NewFn(&graph.DynFn{
 			Name: u.Dynfn.Name,
@@ -257,22 +257,22 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 	if u.Fn != nil {
 		fn, err := decodeUserFn(u.Fn)
 		if err != nil {
-			return nil, fmt.Errorf("bad userfn: %v", err)
+			return nil, fmt.Errorf("failed to decode DoFn %v, failed to decode userfn: %v", u, err)
 		}
 		fx, err := funcx.New(reflectx.MakeFunc(fn))
 		if err != nil {
-			return nil, fmt.Errorf("bad userfn: %v", err)
+			return nil, fmt.Errorf("failed to decode DoFn %v, failed to construct userfn: %v", u, err)
 		}
 		return &graph.Fn{Fn: fx}, nil
 	}
 
 	t, err := decodeType(u.Type)
 	if err != nil {
-		return nil, fmt.Errorf("bad type: %v", err)
+		return nil, fmt.Errorf("failed to decode structural DoFn %v, bad type: %v", u, err)
 	}
 	fn, err := reflectx.UnmarshalJSON(t, u.Opt)
 	if err != nil {
-		return nil, fmt.Errorf("bad struct encoding: %v", err)
+		return nil, fmt.Errorf("failed to decode structural DoFn %v, bad struct encoding: %v", u, err)
 	}
 	return graph.NewFn(fn)
 }
@@ -286,7 +286,7 @@ func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
 	symbol := u.Fn.Name()
 	t, err := encodeType(u.Fn.Type())
 	if err != nil {
-		return nil, fmt.Errorf("encodeUserFn: bad function type: %v", err)
+		return nil, fmt.Errorf("failed to encode userfn %v, bad function type: %v", u, err)
 	}
 	return &v1.UserFn{Name: symbol, Type: t}, nil
 }
@@ -319,7 +319,7 @@ func encodeFullType(t typex.FullType) (*v1.FullType, error) {
 
 	prim, err := encodeType(t.Type())
 	if err != nil {
-		return nil, fmt.Errorf("bad type: %v", err)
+		return nil, fmt.Errorf("failed to encode full type %v, bad type: %v", t, err)
 	}
 	return &v1.FullType{Type: prim, Components: components}, nil
 }
@@ -336,7 +336,7 @@ func decodeFullType(t *v1.FullType) (typex.FullType, error) {
 
 	prim, err := decodeType(t.Type)
 	if err != nil {
-		return nil, fmt.Errorf("bad type: %v", err)
+		return nil, fmt.Errorf("failed to decode full type %v, bad type: %v", t, err)
 	}
 	return typex.New(prim, components...), nil
 }
@@ -388,7 +388,7 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Slice:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to encode slice %v, bad element type: %v", t, err)
 		}
 		return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil
 
@@ -399,7 +399,7 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 
 			fType, err := encodeType(f.Type)
 			if err != nil {
-				return nil, fmt.Errorf("bad field type: %v", err)
+				return nil, fmt.Errorf("failed to encode struct %v, bad field type: %v", t, err)
 			}
 
 			field := &v1.Type_StructField{
@@ -420,7 +420,7 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 		for i := 0; i < t.NumIn(); i++ {
 			param, err := encodeType(t.In(i))
 			if err != nil {
-				return nil, fmt.Errorf("bad parameter type: %v", err)
+				return nil, fmt.Errorf("failed to encode function %v, bad parameter type: %v", t, err)
 			}
 			in = append(in, param)
 		}
@@ -428,7 +428,7 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 		for i := 0; i < t.NumOut(); i++ {
 			ret, err := encodeType(t.Out(i))
 			if err != nil {
-				return nil, fmt.Errorf("bad return type: %v", err)
+				return nil, fmt.Errorf("failed to encode function %v, bad return type: %v", t, err)
 			}
 			out = append(out, ret)
 		}
@@ -437,7 +437,7 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Chan:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to encode channel %v, bad element type: %v", t, err)
 		}
 		dir := encodeChanDir(t.ChanDir())
 		return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: dir}, nil
@@ -445,12 +445,12 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Ptr:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to encode pointer %v, bad base type: %v", t, err)
 		}
 		return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil
 
 	default:
-		return nil, fmt.Errorf("unencodable type: %v", t)
+		return nil, fmt.Errorf("unencodable type %v", t)
 	}
 }
 
@@ -496,7 +496,7 @@ func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, bool) {
 
 func decodeType(t *v1.Type) (reflect.Type, error) {
 	if t == nil {
-		return nil, fmt.Errorf("empty type")
+		return nil, fmt.Errorf("failed to decode type %v, empty type", t)
 	}
 
 	switch t.Kind {
@@ -532,7 +532,7 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 	case v1.Type_SLICE:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
 		}
 		return reflect.SliceOf(elm), nil
 
@@ -541,7 +541,7 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 		for _, f := range t.Fields {
 			fType, err := decodeType(f.Type)
 			if err != nil {
-				return nil, fmt.Errorf("bad field type: %v", err)
+				return nil, fmt.Errorf("failed to decode type %v, bad field type: %v", t, err)
 			}
 
 			field := reflect.StructField{
@@ -560,48 +560,48 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 	case v1.Type_FUNC:
 		in, err := decodeTypes(t.GetParameterTypes())
 		if err != nil {
-			return nil, fmt.Errorf("bad parameter type: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad parameter type: %v", t, err)
 		}
 		out, err := decodeTypes(t.GetReturnTypes())
 		if err != nil {
-			return nil, fmt.Errorf("bad return type: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad return type: %v", t, err)
 		}
 		return reflect.FuncOf(in, out, t.GetIsVariadic()), nil
 
 	case v1.Type_CHAN:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
 		}
 		dir, err := decodeChanDir(t.GetChanDir())
 		if err != nil {
-			return nil, fmt.Errorf("bad ChanDir: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad channel direction: %v", t, err)
 		}
 		return reflect.ChanOf(dir, elm), nil
 
 	case v1.Type_PTR:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
 		}
 		return reflect.PtrTo(elm), nil
 
 	case v1.Type_SPECIAL:
 		ret, err := decodeSpecial(t.Special)
 		if err != nil {
-			return nil, fmt.Errorf("bad element: %v", err)
+			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
 		}
 		return ret, nil
 
 	case v1.Type_EXTERNAL:
 		ret, ok := runtime.LookupType(t.ExternalKey)
 		if !ok {
-			return nil, fmt.Errorf("external key not found: %v", t.ExternalKey)
+			return nil, fmt.Errorf("failed to decode type %v, external key not found %v", t, t.ExternalKey)
 		}
 		return ret, nil
 
 	default:
-		return nil, fmt.Errorf("unexpected type kind: %v", t.Kind)
+		return nil, fmt.Errorf("failed to decode type %v, unexpected type kind %v", t, t.Kind)
 	}
 }
 
@@ -641,7 +641,7 @@ func decodeSpecial(s v1.Type_Special) (reflect.Type, error) {
 		return typex.ZType, nil
 
 	default:
-		return nil, fmt.Errorf("unknown special type: %v", s)
+		return nil, fmt.Errorf("failed to decode special type, unknown type %v", s)
 	}
 }
 
@@ -682,7 +682,7 @@ func encodeChanDir(dir reflect.ChanDir) v1.Type_ChanDir {
 	case reflect.BothDir:
 		return v1.Type_BOTH
 	default:
-		panic(fmt.Sprintf("unexpected reflect.ChanDir: %v", dir))
+		panic(fmt.Sprintf("Failed to encode channel direction, invalid value: %v", dir))
 	}
 }
 
@@ -695,7 +695,7 @@ func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, error) {
 	case v1.Type_BOTH:
 		return reflect.BothDir, nil
 	default:
-		return reflect.BothDir, fmt.Errorf("invalid chan dir: %v", dir)
+		return reflect.BothDir, fmt.Errorf("failed to decode channel direction, invalid value: %v", dir)
 	}
 }
 
@@ -716,7 +716,7 @@ func encodeInputKind(k graph.InputKind) v1.MultiEdge_Inbound_InputKind {
 	case graph.ReIter:
 		return v1.MultiEdge_Inbound_REITER
 	default:
-		panic(fmt.Sprintf("unexpected input: %v", k))
+		panic(fmt.Sprintf("Failed to encode input kind, invalid value: %v", k))
 	}
 }
 
@@ -737,6 +737,6 @@ func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) (graph.InputKind, error)
 	case v1.MultiEdge_Inbound_REITER:
 		return graph.ReIter, nil
 	default:
-		return graph.Main, fmt.Errorf("invalid input kind: %v", k)
+		return graph.Main, fmt.Errorf("failed to decode input kind, invalid value: %v", k)
 	}
 }