You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/24 02:48:05 UTC

[beam] branch master updated: [BEAM-7086] Refactoring repetitive error code.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aef01da  [BEAM-7086] Refactoring repetitive error code.
     new 5556467  Merge pull request #8674 from youngoli/beam7086
aef01da is described below

commit aef01da787f6c92bf8d7491dfa89ac43cbe85ace
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Thu May 23 15:46:19 2019 -0700

    [BEAM-7086] Refactoring repetitive error code.
    
    Straightforward change, make some function literals for the really
    repetitive errors.
---
 sdks/go/pkg/beam/core/graph/bind.go | 30 ++++++++++++----------
 sdks/go/pkg/beam/core/graph/edge.go | 51 ++++++++++++++++++-------------------
 2 files changed, 42 insertions(+), 39 deletions(-)

diff --git a/sdks/go/pkg/beam/core/graph/bind.go b/sdks/go/pkg/beam/core/graph/bind.go
index 77a6d69..f5cc7f8 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -60,30 +60,34 @@ import (
 //
 // Here, the inbound shape and output types are different from before.
 func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) {
+	addContext := func(err error, fn *funcx.Fn) error {
+		return errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+	}
+
 	inbound, kinds, err := findInbound(fn, in...)
 	if err != nil {
-		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+		return nil, nil, nil, nil, addContext(err, fn)
 	}
 	outbound, err := findOutbound(fn)
 	if err != nil {
-		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+		return nil, nil, nil, nil, addContext(err, fn)
 	}
 
 	subst, err := typex.Bind(inbound, in)
 	if err != nil {
-		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+		return nil, nil, nil, nil, addContext(err, fn)
 	}
 	for k, v := range typedefs {
 		if substK, exists := subst[k]; exists {
 			err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK)
-			return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+			return nil, nil, nil, nil, addContext(err, fn)
 		}
 		subst[k] = v
 	}
 
 	out, err := typex.Substitute(outbound, subst)
 	if err != nil {
-		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+		return nil, nil, nil, nil, addContext(err, fn)
 	}
 	return inbound, kinds, outbound, out, nil
 }
@@ -128,6 +132,9 @@ func returnTypes(list []funcx.ReturnParam) []reflect.Type {
 
 func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputKind, error) {
 	// log.Printf("Bind inbound: %v %v", fn, in)
+	addContext := func(err error, p []funcx.FnParam, in interface{}) error {
+		return errors.WithContextf(err, "binding params %v to input %v", p, in)
+	}
 
 	var inbound []typex.FullType
 	var kinds []InputKind
@@ -136,29 +143,26 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputK
 	for _, input := range in {
 		arity, err := inboundArity(input, index == 0)
 		if err != nil {
-			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params, input)
+			return nil, nil, addContext(err, params, input)
 		}
 		if len(params)-index < arity {
-			err := errors.New("too few params")
-			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params[index:], input)
+			return nil, nil, addContext(errors.New("too few params"), params[index:], input)
 		}
 
 		paramsToBind := params[index : index+arity]
 		elm, kind, err := tryBindInbound(input, paramsToBind, index == 0)
 		if err != nil {
-			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", paramsToBind, input)
+			return nil, nil, addContext(err, paramsToBind, input)
 		}
 		inbound = append(inbound, elm)
 		kinds = append(kinds, kind)
 		index += arity
 	}
 	if index < len(params) {
-		err := errors.New("too few inputs: forgot an input or to annotate options?")
-		return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
+		return nil, nil, addContext(errors.New("too few inputs: forgot an input or to annotate options?"), params, in)
 	}
 	if index > len(params) {
-		err := errors.New("too many inputs")
-		return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
+		return nil, nil, addContext(errors.New("too many inputs"), params, in)
 	}
 	return inbound, kinds, nil
 }
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index 846dceb..dcdfd0b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -186,14 +186,15 @@ func (e *MultiEdge) String() string {
 
 // NewCoGBK inserts a new CoGBK edge into the graph.
 func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
+	addContext := func(err error, s *Scope) error {
+		return errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+	}
+
 	if len(ns) == 0 {
-		// TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file.
-		err := errors.New("needs at least 1 input")
-		return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+		return nil, addContext(errors.New("needs at least 1 input"), s)
 	}
 	if !typex.IsKV(ns[0].Type()) {
-		err := errors.Errorf("input type must be KV: %v", ns[0])
-		return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+		return nil, addContext(errors.Errorf("input type must be KV: %v", ns[0]), s)
 	}
 
 	// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>.
@@ -206,20 +207,16 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
 	for i := 1; i < len(ns); i++ {
 		n := ns[i]
 		if !typex.IsKV(n.Type()) {
-			err := errors.Errorf("input type must be KV: %v", n)
-			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+			return nil, addContext(errors.Errorf("input type must be KV: %v", n), s)
 		}
 		if !n.Coder.Components[0].Equals(c) {
-			err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c)
-			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+			return nil, addContext(errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c), s)
 		}
 		if !w.Equals(n.WindowingStrategy()) {
-			err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w)
-			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+			return nil, addContext(errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w), s)
 		}
 		if bounded != n.Bounded() {
-			err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded)
-			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+			return nil, addContext(errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded), s)
 		}
 
 		comp = append(comp, n.Type().Components()[1])
@@ -242,9 +239,12 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
 // NewFlatten inserts a new Flatten edge in the graph. Flatten output type is
 // the shared input type.
 func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
+	addContext := func(err error, s *Scope) error {
+		return errors.WithContextf(err, "creating new Flatten in scope %v", s)
+	}
+
 	if len(in) < 2 {
-		err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in))
-		return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+		return nil, addContext(errors.Errorf("Flatten needs at least 2 input, got %v", len(in)), s)
 	}
 	t := in[0].Type()
 	w := inputWindow(in)
@@ -260,17 +260,14 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 	}
 	for _, n := range in {
 		if !typex.IsEqual(t, n.Type()) {
-			err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t)
-			return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+			return nil, addContext(errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t), s)
 		}
 		if !w.Equals(n.WindowingStrategy()) {
-			err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w)
-			return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+			return nil, addContext(errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w), s)
 		}
 	}
 	if typex.IsCoGBK(t) {
-		err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t)
-		return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+		return nil, addContext(errors.Errorf("Flatten input type cannot be CoGBK: %v", t), s)
 	}
 
 	edge := g.NewEdge(s)
@@ -338,14 +335,16 @@ const CombinePerKeyScope = "CombinePerKey"
 // NewCombine inserts a new Combine edge into the graph. Combines cannot have side
 // input.
 func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) {
+	addContext := func(err error, s *Scope) error {
+		return errors.WithContextf(err, "creating new Combine in scope %v", s)
+	}
+
 	inT := in.Type()
 	if !typex.IsCoGBK(inT) {
-		err := errors.Errorf("Combine requires CoGBK type: %v", inT)
-		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+		return nil, addContext(errors.Errorf("Combine requires CoGBK type: %v", inT), s)
 	}
 	if len(inT.Components()) > 2 {
-		err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT)
-		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+		return nil, addContext(errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT), s)
 	}
 
 	// Create a synthetic function for binding purposes. It takes main input
@@ -394,7 +393,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*M
 
 	inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
 	if err != nil {
-		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+		return nil, addContext(err, s)
 	}
 
 	edge := g.NewEdge(s)