You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/07 19:59:40 UTC

[GitHub] [beam] lostluck commented on a change in pull request #13028: [BEAM-8017] Plumb errors and remove panics from package graphx

lostluck commented on a change in pull request #13028:
URL: https://github.com/apache/beam/pull/13028#discussion_r501236492



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
##########
@@ -107,7 +107,11 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
 
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
-			coders, err := graphx.UnmarshalCoders(graphx.MarshalCoders([]*coder.Coder{test.c}))
+			ids, marshalCoders, err := graphx.MarshalCoders([]*coder.Coder{test.c})
+			if err != nil {
+				t.Fatalf("Marshal(Marshal(%v)) failed: %v", test.c, err)

Review comment:
       ```suggestion
   				t.Fatalf("Marshal(%v) failed: %v", test.c, err)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -184,18 +190,26 @@ func (m *marshaller) getRequirements() []string {
 	return reqs
 }
 
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
 	id := scopeID(s.Scope.Scope)
 	if _, exists := m.transforms[id]; exists {
-		return id
+		return id, nil
 	}
 
 	var subtransforms []string
 	for _, edge := range s.Edges {
-		subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
+		ids, err := m.addMultiEdge(edge)
+		if err != nil {
+			return "", errors.Wrapf(err, "Failed to add scope tree: %v", s)

Review comment:
       ```suggestion
   			return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -204,60 +218,90 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 		EnvironmentId: m.addDefaultEnv(),
 	}
 
-	m.updateIfCombineComposite(s, transform)
+	if err := m.updateIfCombineComposite(s, transform); err != nil {
+		return "", errors.Wrapf(err, "Failed to add socpe tree: %v", s)
+	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
 // updateIfCombineComposite examines the scope tree and sets the PTransform Spec
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) error {
 	if s.Scope.Name != graph.CombinePerKeyScope ||
 		len(s.Edges) != 2 ||
 		len(s.Edges[0].Edge.Input) != 1 ||
 		len(s.Edges[1].Edge.Output) != 1 ||
 		s.Edges[1].Edge.Op != graph.Combine {
-		return
+		return nil
 	}
 
 	edge := s.Edges[1].Edge
-	acID := m.coders.Add(edge.AccumCoder)
+	acID, err := m.coders.Add(edge.AccumCoder)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to update PTransform spec: %v", transform)
+	}
+	mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to update PTransform spec: %v", transform)

Review comment:
       ```suggestion
   		return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -204,60 +218,90 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 		EnvironmentId: m.addDefaultEnv(),
 	}
 
-	m.updateIfCombineComposite(s, transform)
+	if err := m.updateIfCombineComposite(s, transform); err != nil {
+		return "", errors.Wrapf(err, "Failed to add socpe tree: %v", s)
+	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
 // updateIfCombineComposite examines the scope tree and sets the PTransform Spec
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) error {
 	if s.Scope.Name != graph.CombinePerKeyScope ||
 		len(s.Edges) != 2 ||
 		len(s.Edges[0].Edge.Input) != 1 ||
 		len(s.Edges[1].Edge.Output) != 1 ||
 		s.Edges[1].Edge.Op != graph.Combine {
-		return
+		return nil
 	}
 
 	edge := s.Edges[1].Edge
-	acID := m.coders.Add(edge.AccumCoder)
+	acID, err := m.coders.Add(edge.AccumCoder)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to update PTransform spec: %v", transform)
+	}
+	mustEncodeMultiEdge, err := mustEncodeMultiEdgeBase64(edge)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to update PTransform spec: %v", transform)
+	}
 	payload := &pipepb.CombinePayload{
 		CombineFn: &pipepb.FunctionSpec{
 			Urn:     URNDoFn,
-			Payload: []byte(mustEncodeMultiEdgeBase64(edge)),
+			Payload: []byte(mustEncodeMultiEdge),
 		},
 		AccumulatorCoderId: acID,
 	}
 	transform.Spec = &pipepb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
+	return nil
+}
+
+func newAddMultiEdgeError(err error, edge NamedEdge) error {
+	return errors.Wrapf(err, "Failed to add input kind: %v", edge)

Review comment:
       ```suggestion
   	return errors.Wrapf(err, "failed to add input kind: %v", edge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
##########
@@ -70,37 +69,45 @@ const (
 )
 
 // MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		return nil, errors.Errorf("Failed to make KV Union coder. Expected CoGBK, got %v", gbk)
 	}
 
 	from := gbk.Input[0].From
 	key := from.Coder.Components[0]
-	return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return nil, errors.Wrapf(err, "Failed to make KV Union coder.")
+	}
+	return coder.NewKV([]*coder.Coder{key, kvCoder}), nil
 }
 
 // MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
-func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		return nil, errors.Errorf("Failed to make GBK Union coder. Expected CoGBK, got %v", gbk)
 	}
 
 	from := gbk.Input[0].From
 	key := from.Coder.Components[0]
-	return coder.NewCoGBK([]*coder.Coder{key, makeUnionCoder()})
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return nil, errors.Wrapf(err, "Failed to make GBK Union coder.")

Review comment:
       ```suggestion
   		return nil, errors.Wrapf(err, "failed to make GBK Union coder.")
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/serialize.go
##########
@@ -252,7 +255,7 @@ func encodeFn(u *graph.Fn) (*v1pb.Fn, error) {
 		return &v1pb.Fn{Type: typ, Opt: string(data)}, nil
 
 	default:
-		panic(fmt.Sprintf("Failed to encode DoFn %v, missing fn", u))
+		return nil, errors.Errorf("Failed to encode DoFn %v, missing fn", u)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("failed to encode DoFn %v, missing fn", u)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -204,60 +218,90 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 		EnvironmentId: m.addDefaultEnv(),
 	}
 
-	m.updateIfCombineComposite(s, transform)
+	if err := m.updateIfCombineComposite(s, transform); err != nil {
+		return "", errors.Wrapf(err, "Failed to add socpe tree: %v", s)
+	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
 // updateIfCombineComposite examines the scope tree and sets the PTransform Spec
 // to be a CombinePerKey with a CombinePayload if it's a liftable composite.
 // Beam Portability requires that composites contain an implementation for runners
 // that don't understand the URN and Payload, which this lightly checks for.
-func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) {
+func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pipepb.PTransform) error {
 	if s.Scope.Name != graph.CombinePerKeyScope ||
 		len(s.Edges) != 2 ||
 		len(s.Edges[0].Edge.Input) != 1 ||
 		len(s.Edges[1].Edge.Output) != 1 ||
 		s.Edges[1].Edge.Op != graph.Combine {
-		return
+		return nil
 	}
 
 	edge := s.Edges[1].Edge
-	acID := m.coders.Add(edge.AccumCoder)
+	acID, err := m.coders.Add(edge.AccumCoder)
+	if err != nil {
+		return errors.Wrapf(err, "Failed to update PTransform spec: %v", transform)

Review comment:
       ```suggestion
   		return errors.Wrapf(err, "failed to update PTransform spec: %v", transform)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
##########
@@ -70,37 +69,45 @@ const (
 )
 
 // MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		return nil, errors.Errorf("Failed to make KV Union coder. Expected CoGBK, got %v", gbk)
 	}
 
 	from := gbk.Input[0].From
 	key := from.Coder.Components[0]
-	return coder.NewKV([]*coder.Coder{key, makeUnionCoder()})
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return nil, errors.Wrapf(err, "Failed to make KV Union coder.")
+	}
+	return coder.NewKV([]*coder.Coder{key, kvCoder}), nil
 }
 
 // MakeGBKUnionCoder returns CoGBK<K,KV<int,[]byte>> for a given CoGBK.
-func MakeGBKUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeGBKUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		return nil, errors.Errorf("Failed to make GBK Union coder. Expected CoGBK, got %v", gbk)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("failed to make GBK Union coder. Expected CoGBK, got %v", gbk)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/cogbk.go
##########
@@ -70,37 +69,45 @@ const (
 )
 
 // MakeKVUnionCoder returns KV<K,KV<int,[]byte>> for a given CoGBK.
-func MakeKVUnionCoder(gbk *graph.MultiEdge) *coder.Coder {
+func MakeKVUnionCoder(gbk *graph.MultiEdge) (*coder.Coder, error) {
 	if gbk.Op != graph.CoGBK {
-		panic(fmt.Sprintf("expected CoGBK, got %v", gbk))
+		return nil, errors.Errorf("Failed to make KV Union coder. Expected CoGBK, got %v", gbk)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("failed to make KV Union coder. Expected CoGBK, got %v", gbk)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -77,12 +77,12 @@ func goCapabilities() []string {
 }
 
 // CreateEnvironment produces the appropriate payload for the type of environment.
-func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) *pipepb.Environment {
+func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) {
 	var serializedPayload []byte
 	switch urn {
 	case "beam:env:process:v1":
 		// TODO Support process based SDK Harness.
-		panic(fmt.Sprintf("Unsupported environment %v", urn))
+		return nil, errors.Errorf("Unsupported environment %v", urn)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("unsupported environment %v", urn)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -204,60 +218,90 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 		EnvironmentId: m.addDefaultEnv(),
 	}
 
-	m.updateIfCombineComposite(s, transform)
+	if err := m.updateIfCombineComposite(s, transform); err != nil {
+		return "", errors.Wrapf(err, "Failed to add socpe tree: %v", s)

Review comment:
       ```suggestion
   		return "", errors.Wrapf(err, "failed to add socpe tree: %v", s)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -184,18 +190,26 @@ func (m *marshaller) getRequirements() []string {
 	return reqs
 }
 
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(s *ScopeTree) (string, error) {
 	id := scopeID(s.Scope.Scope)
 	if _, exists := m.transforms[id]; exists {
-		return id
+		return id, nil
 	}
 
 	var subtransforms []string
 	for _, edge := range s.Edges {
-		subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
+		ids, err := m.addMultiEdge(edge)
+		if err != nil {
+			return "", errors.Wrapf(err, "Failed to add scope tree: %v", s)
+		}
+		subtransforms = append(subtransforms, ids...)
 	}
 	for _, tree := range s.Children {
-		subtransforms = append(subtransforms, m.addScopeTree(tree))
+		id, err := m.addScopeTree(tree)
+		if err != nil {
+			return "", errors.Wrapf(err, "Failed to add scope tree: %v", s)

Review comment:
       ```suggestion
   			return "", errors.Wrapf(err, "failed to add scope tree: %v", s)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -384,17 +452,19 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 	}
 	m.transforms[id] = transform
 	allPIds = append(allPIds, id)
-	return allPIds
+	return allPIds, nil
 }
 
-func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
+func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) (string, error) {
 	edge := namedEdge.Edge
 	id := edgeID(edge)
 
 	inputs := make(map[string]string)
 
 	for tag, n := range ExternalInputs(edge) {
-		m.addNode(n)
+		if _, err := m.addNode(n); err != nil {
+			return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)

Review comment:
       ```suggestion
   			return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -420,33 +490,58 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
 		// map consumers of these outputs to the expanded transform's outputs.
 		outputs := make(map[string]string)
 		for i, out := range edge.Output {
-			m.addNode(out.To)
+			if _, err := m.addNode(out.To); err != nil {
+				return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)

Review comment:
       ```suggestion
   				return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -809,32 +965,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					GapSize: ptypes.DurationProto(w.Gap),
 				},
 			),
-		}
+		}, nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("Unexpected windowing strategy: %v", w)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("unexpected windowing strategy: %v", w)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -569,36 +670,59 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
 // User code is able to write reshards, but it's easier to access
 // the window coders framework side, which is critical for the reshard
 // to function with unbounded inputs.
-func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) {
 	id := edgeID(edge.Edge)
-	var kvCoderID, gbkCoderID string
-	{
-		kv := makeUnionCoder()
-		kvCoderID = m.coders.Add(kv)
-		gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+	kvCoder, err := makeUnionCoder()
+	if err != nil {
+		return "", errors.Wrapf(err, "Fail to expand Reshuffle transform for edge: %v", edge)

Review comment:
       Please use the same anon-function approach here to reduce repetition.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -809,32 +965,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					GapSize: ptypes.DurationProto(w.Gap),
 				},
 			),
-		}
+		}, nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("Unexpected windowing strategy: %v", w)
 	}
 }
 
-func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
+func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) {
 	switch w.Kind {
 	case window.GlobalWindows:
-		return coder.NewGlobalWindow()
+		return coder.NewGlobalWindow(), nil
 	case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn:
-		return coder.NewIntervalWindow()
+		return coder.NewIntervalWindow(), nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("Unexpected windowing strategy: %v", w)

Review comment:
       ```suggestion
   		return nil, errors.Errorf("unexpected windowing strategy: %v", w)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -358,16 +421,21 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 		spec = &pipepb.FunctionSpec{Urn: URNGBK}
 
 	case graph.WindowInto:
+		windowFn, err := makeWindowFn(edge.Edge.WindowFn)
+		if err != nil {
+			return nil, newAddMultiEdgeError(err, edge)
+		}
 		payload := &pipepb.WindowIntoPayload{
-			WindowFn: makeWindowFn(edge.Edge.WindowFn),
+			WindowFn: windowFn,
 		}
 		spec = &pipepb.FunctionSpec{Urn: URNWindow, Payload: protox.MustEncode(payload)}
 
 	case graph.External:
 		spec = &pipepb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: edge.Edge.Payload.Data}
 
 	default:
-		panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
+		err := errors.Errorf("Unexpected opcode: %v", edge.Edge.Op)

Review comment:
       ```suggestion
   		err := errors.Errorf("unexpected opcode: %v", edge.Edge.Op)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -322,31 +372,44 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 				}
 
 			case graph.Map, graph.MultiMap:
-				panic("NYI")
+				return nil, errors.Errorf("Not implemented")

Review comment:
       ```suggestion
   				return nil, errors.Errorf("not implemented")
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -420,33 +490,58 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
 		// map consumers of these outputs to the expanded transform's outputs.
 		outputs := make(map[string]string)
 		for i, out := range edge.Output {
-			m.addNode(out.To)
+			if _, err := m.addNode(out.To); err != nil {
+				return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)
+			}
 			outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
 		}
 		transform.Outputs = outputs
-		transform.EnvironmentId = ExpandedTransform(edge.External.Expanded).EnvironmentId
+		environment, err := ExpandedTransform(edge.External.Expanded)
+		if err != nil {
+			return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)
+		}
+		transform.EnvironmentId = environment.EnvironmentId
 	}
 
 	m.transforms[id] = transform
-	return id
+	return id, nil
 }
 
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
 	// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have to translate
 	// CoGBK with multiple PCollections as described in cogbk.go.
 
 	id := edgeID(edge.Edge)
-	kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
-	gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+	kvCoder, err := MakeKVUnionCoder(edge.Edge)
+	if err != nil {
+		return "", errors.Wrapf(err, "Fail to expand CoGBK transform for edge: %v", edge)

Review comment:
       There's a lot of repetition in this function around the error handler. We can reduce that.
   
   Make an anonymous helper function at the top of the function:
   ```
   handleErr := func(err) (string, error) {
   		return "", errors.Wrapf(err, "failed to expand CoGBK transform for edge: %v", edge)
   }
   ```
   And then replace the repeated return lines with:
   ```
     return handleErr(err)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang_test.go
##########
@@ -156,7 +156,11 @@ func TestExpandedTransform(t *testing.T) {
 		want := newTransform("x")
 		exp := &graph.ExpandedTransform{Transform: want}
 
-		got := ExpandedTransform(exp)
+		got, err := ExpandedTransform(exp)
+
+		if err != nil {
+			t.Fatal(err)
+		}

Review comment:
       ```suggestion
   		got, err := ExpandedTransform(exp)
   		if err != nil {
   			t.Fatal(err)
   		}
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -699,27 +828,38 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) string {
 		},
 		EnvironmentId: m.addDefaultEnv(),
 	}
-	return reshuffleID
+	return reshuffleID, nil
 }
 
-func (m *marshaller) addNode(n *graph.Node) string {
+func (m *marshaller) addNode(n *graph.Node) (string, error) {
 	id := nodeID(n)
 	if _, exists := m.pcollections[id]; exists {
-		return id
+		return id, nil
 	}
 	// TODO(herohde) 11/15/2017: expose UniqueName to user.
-	return m.makeNode(id, m.coders.Add(n.Coder), n)
+	cid, err := m.coders.Add(n.Coder)
+	if err != nil {
+		return "", err
+	}
+	return m.makeNode(id, cid, n)
 }
 
-func (m *marshaller) makeNode(id, cid string, n *graph.Node) string {
+func (m *marshaller) makeNode(id, cid string, n *graph.Node) (string, error) {
+
+	windowingStrategyId, err := m.addWindowingStrategy(n.WindowingStrategy())
+

Review comment:
       ```suggestion
   	windowingStrategyId, err := m.addWindowingStrategy(n.WindowingStrategy())
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -809,32 +965,32 @@ func makeWindowFn(w *window.Fn) *pipepb.FunctionSpec {
 					GapSize: ptypes.DurationProto(w.Gap),
 				},
 			),
-		}
+		}, nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("Unexpected windowing strategy: %v", w)
 	}
 }
 
-func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
+func makeWindowCoder(w *window.Fn) (*coder.WindowCoder, error) {
 	switch w.Kind {
 	case window.GlobalWindows:
-		return coder.NewGlobalWindow()
+		return coder.NewGlobalWindow(), nil
 	case window.FixedWindows, window.SlidingWindows, URNSlidingWindowsWindowFn:
-		return coder.NewIntervalWindow()
+		return coder.NewIntervalWindow(), nil
 	default:
-		panic(fmt.Sprintf("Unexpected windowing strategy: %v", w))
+		return nil, errors.Errorf("Unexpected windowing strategy: %v", w)
 	}
 }
 
-func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
+func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) (string, error) {
 	ref, err := EncodeMultiEdge(edge)
 	if err != nil {
-		panic(errors.Wrapf(err, "Failed to serialize %v", edge))
+		return "", errors.Wrapf(err, "Failed to serialize %v", edge)

Review comment:
       ```suggestion
   		return "", errors.Wrapf(err, "failed to serialize %v", edge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -322,31 +372,44 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 				}
 
 			case graph.Map, graph.MultiMap:
-				panic("NYI")
+				return nil, errors.Errorf("Not implemented")
 
 			default:
-				panic(fmt.Sprintf("unexpected input kind: %v", edge))
+				return nil, errors.Errorf("Unexpected input kind: %v", edge)

Review comment:
       ```suggestion
   				return nil, errors.Errorf("unexpected input kind: %v", edge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -420,33 +490,58 @@ func (m *marshaller) expandCrossLanguage(namedEdge NamedEdge) string {
 		// map consumers of these outputs to the expanded transform's outputs.
 		outputs := make(map[string]string)
 		for i, out := range edge.Output {
-			m.addNode(out.To)
+			if _, err := m.addNode(out.To); err != nil {
+				return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)
+			}
 			outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
 		}
 		transform.Outputs = outputs
-		transform.EnvironmentId = ExpandedTransform(edge.External.Expanded).EnvironmentId
+		environment, err := ExpandedTransform(edge.External.Expanded)
+		if err != nil {
+			return "", errors.Wrapf(err, "Failed to expand cross language transform for edge: %v", namedEdge)

Review comment:
       ```suggestion
   			return "", errors.Wrapf(err, "failed to expand cross language transform for edge: %v", namedEdge)
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -737,10 +877,14 @@ func (m *marshaller) addDefaultEnv() string {
 	return id
 }
 
-func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) string {
-	ws := marshalWindowingStrategy(m.coders, w)
+func (m *marshaller) addWindowingStrategy(w *window.WindowingStrategy) (string, error) {
+	ws, err := marshalWindowingStrategy(m.coders, w)
+	if err != nil {
+		return "", errors.Wrapf(err, "Failed to add window strategy %v", w)
+	}
 	ws.EnvironmentId = m.addDefaultEnv()
-	return m.internWindowingStrategy(ws)
+	return m.internWindowingStrategy(ws), nil
+
 }

Review comment:
       ```suggestion
   	return m.internWindowingStrategy(ws), nil
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org