You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/24 02:48:05 UTC
[beam] branch master updated: [BEAM-7086] Refactoring repetitive
error code.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aef01da [BEAM-7086] Refactoring repetitive error code.
new 5556467 Merge pull request #8674 from youngoli/beam7086
aef01da is described below
commit aef01da787f6c92bf8d7491dfa89ac43cbe85ace
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Thu May 23 15:46:19 2019 -0700
[BEAM-7086] Refactoring repetitive error code.
Straightforward change, make some function literals for the really
repetitive errors.
---
sdks/go/pkg/beam/core/graph/bind.go | 30 ++++++++++++----------
sdks/go/pkg/beam/core/graph/edge.go | 51 ++++++++++++++++++-------------------
2 files changed, 42 insertions(+), 39 deletions(-)
diff --git a/sdks/go/pkg/beam/core/graph/bind.go b/sdks/go/pkg/beam/core/graph/bind.go
index 77a6d69..f5cc7f8 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -60,30 +60,34 @@ import (
//
// Here, the inbound shape and output types are different from before.
func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) {
+ addContext := func(err error, fn *funcx.Fn) error {
+ return errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ }
+
inbound, kinds, err := findInbound(fn, in...)
if err != nil {
- return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ return nil, nil, nil, nil, addContext(err, fn)
}
outbound, err := findOutbound(fn)
if err != nil {
- return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ return nil, nil, nil, nil, addContext(err, fn)
}
subst, err := typex.Bind(inbound, in)
if err != nil {
- return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ return nil, nil, nil, nil, addContext(err, fn)
}
for k, v := range typedefs {
if substK, exists := subst[k]; exists {
err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK)
- return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ return nil, nil, nil, nil, addContext(err, fn)
}
subst[k] = v
}
out, err := typex.Substitute(outbound, subst)
if err != nil {
- return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
+ return nil, nil, nil, nil, addContext(err, fn)
}
return inbound, kinds, outbound, out, nil
}
@@ -128,6 +132,9 @@ func returnTypes(list []funcx.ReturnParam) []reflect.Type {
func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputKind, error) {
// log.Printf("Bind inbound: %v %v", fn, in)
+ addContext := func(err error, p []funcx.FnParam, in interface{}) error {
+ return errors.WithContextf(err, "binding params %v to input %v", p, in)
+ }
var inbound []typex.FullType
var kinds []InputKind
@@ -136,29 +143,26 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputK
for _, input := range in {
arity, err := inboundArity(input, index == 0)
if err != nil {
- return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params, input)
+ return nil, nil, addContext(err, params, input)
}
if len(params)-index < arity {
- err := errors.New("too few params")
- return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params[index:], input)
+ return nil, nil, addContext(errors.New("too few params"), params[index:], input)
}
paramsToBind := params[index : index+arity]
elm, kind, err := tryBindInbound(input, paramsToBind, index == 0)
if err != nil {
- return nil, nil, errors.WithContextf(err, "binding params %v to input %v", paramsToBind, input)
+ return nil, nil, addContext(err, paramsToBind, input)
}
inbound = append(inbound, elm)
kinds = append(kinds, kind)
index += arity
}
if index < len(params) {
- err := errors.New("too few inputs: forgot an input or to annotate options?")
- return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
+ return nil, nil, addContext(errors.New("too few inputs: forgot an input or to annotate options?"), params, in)
}
if index > len(params) {
- err := errors.New("too many inputs")
- return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
+ return nil, nil, addContext(errors.New("too many inputs"), params, in)
}
return inbound, kinds, nil
}
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index 846dceb..dcdfd0b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -186,14 +186,15 @@ func (e *MultiEdge) String() string {
// NewCoGBK inserts a new CoGBK edge into the graph.
func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
+ addContext := func(err error, s *Scope) error {
+ return errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ }
+
if len(ns) == 0 {
- // TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file.
- err := errors.New("needs at least 1 input")
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.New("needs at least 1 input"), s)
}
if !typex.IsKV(ns[0].Type()) {
- err := errors.Errorf("input type must be KV: %v", ns[0])
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.Errorf("input type must be KV: %v", ns[0]), s)
}
// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>.
@@ -206,20 +207,16 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
for i := 1; i < len(ns); i++ {
n := ns[i]
if !typex.IsKV(n.Type()) {
- err := errors.Errorf("input type must be KV: %v", n)
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.Errorf("input type must be KV: %v", n), s)
}
if !n.Coder.Components[0].Equals(c) {
- err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c)
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c), s)
}
if !w.Equals(n.WindowingStrategy()) {
- err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w)
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w), s)
}
if bounded != n.Bounded() {
- err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded)
- return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
+ return nil, addContext(errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded), s)
}
comp = append(comp, n.Type().Components()[1])
@@ -242,9 +239,12 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
// NewFlatten inserts a new Flatten edge in the graph. Flatten output type is
// the shared input type.
func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
+ addContext := func(err error, s *Scope) error {
+ return errors.WithContextf(err, "creating new Flatten in scope %v", s)
+ }
+
if len(in) < 2 {
- err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in))
- return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+ return nil, addContext(errors.Errorf("Flatten needs at least 2 input, got %v", len(in)), s)
}
t := in[0].Type()
w := inputWindow(in)
@@ -260,17 +260,14 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
}
for _, n := range in {
if !typex.IsEqual(t, n.Type()) {
- err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t)
- return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+ return nil, addContext(errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t), s)
}
if !w.Equals(n.WindowingStrategy()) {
- err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w)
- return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+ return nil, addContext(errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w), s)
}
}
if typex.IsCoGBK(t) {
- err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t)
- return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
+ return nil, addContext(errors.Errorf("Flatten input type cannot be CoGBK: %v", t), s)
}
edge := g.NewEdge(s)
@@ -338,14 +335,16 @@ const CombinePerKeyScope = "CombinePerKey"
// NewCombine inserts a new Combine edge into the graph. Combines cannot have side
// input.
func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) {
+ addContext := func(err error, s *Scope) error {
+ return errors.WithContextf(err, "creating new Combine in scope %v", s)
+ }
+
inT := in.Type()
if !typex.IsCoGBK(inT) {
- err := errors.Errorf("Combine requires CoGBK type: %v", inT)
- return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+ return nil, addContext(errors.Errorf("Combine requires CoGBK type: %v", inT), s)
}
if len(inT.Components()) > 2 {
- err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT)
- return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+ return nil, addContext(errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT), s)
}
// Create a synthetic function for binding purposes. It takes main input
@@ -394,7 +393,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*M
inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
if err != nil {
- return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
+ return nil, addContext(err, s)
}
edge := g.NewEdge(s)