You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/01 00:54:20 UTC

[beam] branch master updated: [BEAM-7087] Updating Go SDK errors (Part 1)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 62741ed  [BEAM-7087] Updating Go SDK errors (Part 1)
     new 70cf8c6  Merge pull request #8433 from youngoli/beam7087-3
62741ed is described below

commit 62741ed7b3837a58f87163340f1ea061454485ea
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Mon Apr 29 16:30:24 2019 -0700

    [BEAM-7087] Updating Go SDK errors (Part 1)
    
    This commit changes errors from using standard Go error functionality
    to using the internal Beam "errors" package. it covers the
    subdirectories artifact, testing, transforms, and util.
---
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 30 ++++++++++++-------------
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go   | 25 ++++++++++-----------
 sdks/go/pkg/beam/artifact/materialize.go        | 18 +++++++--------
 sdks/go/pkg/beam/artifact/server_test.go        | 26 ++++++++++-----------
 sdks/go/pkg/beam/artifact/stage.go              | 16 ++++++-------
 sdks/go/pkg/beam/testing/passert/hash.go        |  3 ++-
 sdks/go/pkg/beam/testing/passert/passert.go     |  9 ++++----
 sdks/go/pkg/beam/testing/passert/sum.go         |  3 ++-
 sdks/go/pkg/beam/transforms/top/top.go          |  7 +++---
 sdks/go/pkg/beam/util/gcsx/gcs.go               |  5 +++--
 sdks/go/pkg/beam/util/grpcx/dial.go             |  4 ++--
 sdks/go/pkg/beam/util/grpcx/metadata.go         |  7 +++---
 sdks/go/pkg/beam/util/pubsubx/pubsub.go         |  3 ++-
 sdks/go/pkg/beam/util/starcgenx/starcgenx.go    |  5 +++--
 14 files changed, 83 insertions(+), 78 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index 212b6fa..88489e2 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -16,10 +16,10 @@
 package gcsproxy
 
 import (
-	"fmt"
 	"io"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 	"github.com/golang/protobuf/proto"
@@ -38,20 +38,20 @@ type RetrievalServer struct {
 func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
 	bucket, obj, err := gcsx.ParseObject(object)
 	if err != nil {
-		return nil, fmt.Errorf("invalid manifest object %v: %v", object, err)
+		return nil, errors.Wrapf(err, "invalid manifest object %v", object)
 	}
 
 	cl, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
 	if err != nil {
-		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+		return nil, errors.Wrap(err, "failed to create GCS client")
 	}
 	content, err := gcsx.ReadObject(ctx, cl, bucket, obj)
 	if err != nil {
-		return nil, fmt.Errorf("failed to read manifest %v: %v", object, err)
+		return nil, errors.Wrapf(err, "failed to read manifest %v", object)
 	}
 	var md pb.ProxyManifest
 	if err := proto.Unmarshal(content, &md); err != nil {
-		return nil, fmt.Errorf("invalid manifest %v: %v", object, err)
+		return nil, errors.Wrapf(err, "invalid manifest %v", object)
 	}
 	return &md, nil
 }
@@ -66,7 +66,7 @@ func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
 	blobs := make(map[string]string)
 	for _, l := range md.GetLocation() {
 		if _, _, err := gcsx.ParseObject(l.GetUri()); err != nil {
-			return nil, fmt.Errorf("location %v is not a GCS object: %v", l.GetUri(), err)
+			return nil, errors.Wrapf(err, "location %v is not a GCS object", l.GetUri())
 		}
 		blobs[l.GetName()] = l.GetUri()
 	}
@@ -83,7 +83,7 @@ func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.Arti
 	key := req.GetName()
 	blob, ok := s.blobs[key]
 	if !ok {
-		return fmt.Errorf("artifact %v not found", key)
+		return errors.Errorf("artifact %v not found", key)
 	}
 
 	bucket, object := parseObject(blob)
@@ -91,13 +91,13 @@ func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.Arti
 	ctx := stream.Context()
 	client, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
 	if err != nil {
-		return fmt.Errorf("Failed to create client for %v: %v", key, err)
+		return errors.Wrapf(err, "Failed to create client for %v", key)
 	}
 
 	// Stream artifact in up to 1MB chunks.
 	r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
 	if err != nil {
-		return fmt.Errorf("Failed to read object for %v: %v", key, err)
+		return errors.Wrapf(err, "Failed to read object for %v", key)
 	}
 	defer r.Close()
 
@@ -106,14 +106,14 @@ func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.Arti
 		n, err := r.Read(data)
 		if n > 0 {
 			if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
-				return fmt.Errorf("chunk send failed: %v", err)
+				return errors.Wrap(err, "chunk send failed")
 			}
 		}
 		if err == io.EOF {
 			break
 		}
 		if err != nil {
-			return fmt.Errorf("failed to read from %v: %v", blob, err)
+			return errors.Wrapf(err, "failed to read from %v", blob)
 		}
 	}
 	return nil
@@ -123,24 +123,24 @@ func validate(md *pb.ProxyManifest) error {
 	keys := make(map[string]bool)
 	for _, a := range md.GetManifest().GetArtifact() {
 		if _, seen := keys[a.Name]; seen {
-			return fmt.Errorf("multiple artifact with name %v", a.Name)
+			return errors.Errorf("multiple artifact with name %v", a.Name)
 		}
 		keys[a.Name] = true
 	}
 	for _, l := range md.GetLocation() {
 		fresh, seen := keys[l.Name]
 		if !seen {
-			return fmt.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
+			return errors.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
 		}
 		if !fresh {
-			return fmt.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
+			return errors.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
 		}
 		keys[l.Name] = false
 	}
 
 	for key, fresh := range keys {
 		if fresh {
-			return fmt.Errorf("no location for %v", key)
+			return errors.Errorf("no location for %v", key)
 		}
 	}
 	return nil
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index 02fbe39..e08d1b4 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -20,13 +20,12 @@ import (
 	"bytes"
 	"crypto/sha256"
 	"encoding/hex"
-	"errors"
-	"fmt"
 	"hash"
 	"path"
 	"sync"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 	"github.com/golang/protobuf/proto"
@@ -52,7 +51,7 @@ type staged struct {
 func NewStagingServer(manifest string) (*StagingServer, error) {
 	bucket, object, err := gcsx.ParseObject(manifest)
 	if err != nil {
-		return nil, fmt.Errorf("invalid manifest location: %v", err)
+		return nil, errors.Wrap(err, "invalid manifest location")
 	}
 	root := path.Join(path.Dir(object), "blobs")
 
@@ -78,15 +77,15 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife
 
 	data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
 	if err != nil {
-		return nil, fmt.Errorf("failed to marshal proxy manifest: %v", err)
+		return nil, errors.Wrap(err, "failed to marshal proxy manifest")
 	}
 
 	cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
 	if err != nil {
-		return nil, fmt.Errorf("failed to create GCS client: %v", err)
+		return nil, errors.Wrap(err, "failed to create GCS client")
 	}
 	if err := gcsx.WriteObject(ctx, cl, s.bucket, s.manifest, bytes.NewReader(data)); err != nil {
-		return nil, fmt.Errorf("failed to write manifest: %v", err)
+		return nil, errors.Wrap(err, "failed to write manifest")
 	}
 
 	// Commit returns the location of the manifest as the token, which can
@@ -104,13 +103,13 @@ func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) (
 	for _, a := range artifacts {
 		info, ok := blobs[a.Name]
 		if !ok {
-			return nil, fmt.Errorf("artifact %v not staged", a.Name)
+			return nil, errors.Errorf("artifact %v not staged", a.Name)
 		}
 		if a.Sha256 == "" {
 			a.Sha256 = info.hash
 		}
 		if info.hash != a.Sha256 {
-			return nil, fmt.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
+			return nil, errors.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
 		}
 
 		loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
@@ -124,11 +123,11 @@ func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServ
 
 	header, err := ps.Recv()
 	if err != nil {
-		return fmt.Errorf("failed to receive header: %v", err)
+		return errors.Wrap(err, "failed to receive header")
 	}
 	md := header.GetMetadata().GetMetadata()
 	if md == nil {
-		return fmt.Errorf("expected header as first message: %v", header)
+		return errors.Errorf("expected header as first message: %v", header)
 	}
 	object := path.Join(s.root, md.Name)
 
@@ -138,16 +137,16 @@ func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServ
 	ctx := ps.Context()
 	cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
 	if err != nil {
-		return fmt.Errorf("failed to create GCS client: %v", err)
+		return errors.Wrap(err, "failed to create GCS client")
 	}
 
 	r := &reader{sha256W: sha256.New(), stream: ps}
 	if err := gcsx.WriteObject(ctx, cl, s.bucket, object, r); err != nil {
-		return fmt.Errorf("failed to stage artifact %v: %v", md.Name, err)
+		return errors.Wrapf(err, "failed to stage artifact %v", md.Name)
 	}
 	hash := r.SHA256()
 	if md.Sha256 != "" && md.Sha256 != hash {
-		return fmt.Errorf("invalid SHA256 for artifact %v: %v want %v", md.Name, hash, md.Sha256)
+		return errors.Errorf("invalid SHA256 for artifact %v: %v want %v", md.Name, hash, md.Sha256)
 	}
 
 	s.mu.Lock()
diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go
index 2947dc4..d675620 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -21,7 +21,6 @@ import (
 	"context"
 	"crypto/sha256"
 	"encoding/hex"
-	"fmt"
 	"io"
 	"math/rand"
 	"os"
@@ -30,6 +29,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -50,7 +50,7 @@ func Materialize(ctx context.Context, endpoint string, rt string, dest string) (
 
 	m, err := client.GetManifest(ctx, &pb.GetManifestRequest{RetrievalToken: rt})
 	if err != nil {
-		return nil, fmt.Errorf("failed to get manifest: %v", err)
+		return nil, errors.Wrap(err, "failed to get manifest")
 	}
 	md := m.GetManifest().GetArtifact()
 	return md, MultiRetrieve(ctx, client, 10, md, rt, dest)
@@ -92,7 +92,7 @@ func MultiRetrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient
 					}
 					failures = append(failures, err.Error())
 					if len(failures) > attempts {
-						permErr.TrySetError(fmt.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
+						permErr.TrySetError(errors.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
 						break // give up
 					}
 					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
@@ -114,7 +114,7 @@ func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
 
 	_, err := os.Stat(filename)
 	if err != nil && !os.IsNotExist(err) {
-		return fmt.Errorf("failed to stat %v: %v", filename, err)
+		return errors.Wrapf(err, "failed to stat %v", filename)
 	}
 	if err == nil {
 		// File already exists. Validate or delete.
@@ -130,7 +130,7 @@ func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
 		}
 
 		if err2 := os.Remove(filename); err2 != nil {
-			return fmt.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
+			return errors.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
 		} // else: successfully deleted bad file.
 	} // else: file does not exist.
 
@@ -159,11 +159,11 @@ func retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
 	sha256Hash, err := retrieveChunks(stream, w)
 	if err != nil {
 		fd.Close() // drop any buffered content
-		return fmt.Errorf("failed to retrieve chunk for %v: %v", filename, err)
+		return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
 	}
 	if err := w.Flush(); err != nil {
 		fd.Close()
-		return fmt.Errorf("failed to flush chunks for %v: %v", filename, err)
+		return errors.Wrapf(err, "failed to flush chunks for %v", filename)
 	}
 	if err := fd.Close(); err != nil {
 		return err
@@ -171,7 +171,7 @@ func retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
 
 	// Artifact Sha256 hash is an optional field in metadata so we should only validate when its present.
 	if a.Sha256 != "" && sha256Hash != a.Sha256 {
-		return fmt.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256)
+		return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256)
 	}
 	return nil
 }
@@ -191,7 +191,7 @@ func retrieveChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w io.W
 			panic(err) // cannot fail
 		}
 		if _, err := w.Write(chunk.Data); err != nil {
-			return "", fmt.Errorf("chunk write failed: %v", err)
+			return "", errors.Wrapf(err, "chunk write failed")
 		}
 	}
 	return hex.EncodeToString(sha256W.Sum(nil)), nil
diff --git a/sdks/go/pkg/beam/artifact/server_test.go b/sdks/go/pkg/beam/artifact/server_test.go
index 173ad0c..c440fbd 100644
--- a/sdks/go/pkg/beam/artifact/server_test.go
+++ b/sdks/go/pkg/beam/artifact/server_test.go
@@ -16,13 +16,13 @@
 package artifact
 
 import (
-	"fmt"
 	"io"
 	"net"
 	"sync"
 	"testing"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
 	"golang.org/x/net/context"
@@ -77,14 +77,14 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
 
 	header, err := ps.Recv()
 	if err != nil {
-		return fmt.Errorf("failed to receive header: %v", err)
+		return errors.Wrap(err, "failed to receive header")
 	}
 	if header.GetMetadata() == nil {
-		return fmt.Errorf("expected header as first message: %v", header)
+		return errors.Errorf("expected header as first message: %v", header)
 	}
 	key := header.GetMetadata().GetMetadata().Name
 	if header.GetMetadata().GetStagingSessionToken() == "" {
-		return fmt.Errorf("missing staging session token")
+		return errors.New("missing staging session token")
 	}
 	token := header.GetMetadata().GetStagingSessionToken()
 
@@ -101,10 +101,10 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
 		}
 
 		if msg.GetData() == nil {
-			return fmt.Errorf("expected data: %v", msg)
+			return errors.Errorf("expected data: %v", msg)
 		}
 		if len(msg.GetData().GetData()) == 0 {
-			return fmt.Errorf("expected non-empty data: %v", msg)
+			return errors.Errorf("expected non-empty data: %v", msg)
 		}
 		chunks = append(chunks, msg.GetData().GetData())
 	}
@@ -124,7 +124,7 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
 func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
 	token := req.GetStagingSessionToken()
 	if token == "" {
-		return nil, fmt.Errorf("missing staging session token")
+		return nil, errors.New("missing staging session token")
 	}
 
 	m := s.getManifest(token, true)
@@ -136,7 +136,7 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
 	artifacts := req.GetManifest().GetArtifact()
 	for _, md := range artifacts {
 		if _, ok := m.m[md.Name]; !ok {
-			return nil, fmt.Errorf("artifact %v not staged", md.Name)
+			return nil, errors.Errorf("artifact %v not staged", md.Name)
 		}
 	}
 
@@ -153,12 +153,12 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
 func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
 	token := req.GetRetrievalToken()
 	if token == "" {
-		return nil, fmt.Errorf("missing retrieval token")
+		return nil, errors.New("missing retrieval token")
 	}
 
 	m := s.getManifest(token, false)
 	if m == nil || m.md == nil {
-		return nil, fmt.Errorf("manifest for %v not found", token)
+		return nil, errors.Errorf("manifest for %v not found", token)
 	}
 	m.mu.Lock()
 	defer m.mu.Unlock()
@@ -169,12 +169,12 @@ func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*
 func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
 	token := req.GetRetrievalToken()
 	if token == "" {
-		return fmt.Errorf("missing retrieval token")
+		return errors.New("missing retrieval token")
 	}
 
 	m := s.getManifest(token, false)
 	if m == nil || m.md == nil {
-		return fmt.Errorf("manifest for %v not found", token)
+		return errors.Errorf("manifest for %v not found", token)
 	}
 
 	// Validate artifact and grab chunks so that we can stream them without
@@ -184,7 +184,7 @@ func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetri
 	elm, ok := m.m[req.GetName()]
 	if !ok || elm.md == nil {
 		m.mu.Unlock()
-		return fmt.Errorf("manifest for %v does not contain artifact %v", token, req.GetName())
+		return errors.Errorf("manifest for %v does not contain artifact %v", token, req.GetName())
 	}
 	chunks := elm.chunks
 	m.mu.Unlock()
diff --git a/sdks/go/pkg/beam/artifact/stage.go b/sdks/go/pkg/beam/artifact/stage.go
index bf448cd..1eba505 100644
--- a/sdks/go/pkg/beam/artifact/stage.go
+++ b/sdks/go/pkg/beam/artifact/stage.go
@@ -19,7 +19,6 @@ import (
 	"context"
 	"crypto/sha256"
 	"encoding/hex"
-	"fmt"
 	"io"
 	"io/ioutil"
 	"math/rand"
@@ -30,6 +29,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
@@ -103,7 +103,7 @@ func MultiStage(ctx context.Context, client pb.ArtifactStagingServiceClient, cpu
 					}
 					failures = append(failures, err.Error())
 					if len(failures) > attempts {
-						permErr.TrySetError(fmt.Errorf("failed to stage %v in %v attempts: %v", f.Filename, attempts, strings.Join(failures, "; ")))
+						permErr.TrySetError(errors.Errorf("failed to stage %v in %v attempts: %v", f.Filename, attempts, strings.Join(failures, "; ")))
 						break // give up
 					}
 					time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
@@ -156,18 +156,18 @@ func Stage(ctx context.Context, client pb.ArtifactStagingServiceClient, key, fil
 	}
 	if err := stream.Send(header); err != nil {
 		stream.CloseAndRecv() // ignore error
-		return nil, fmt.Errorf("failed to send header for %v: %v", filename, err)
+		return nil, errors.Wrapf(err, "failed to send header for %v", filename)
 	}
 	stagedHash, err := stageChunks(stream, fd)
 	if err != nil {
 		stream.CloseAndRecv() // ignore error
-		return nil, fmt.Errorf("failed to send chunks for %v: %v", filename, err)
+		return nil, errors.Wrapf(err, "failed to send chunks for %v", filename)
 	}
 	if _, err := stream.CloseAndRecv(); err != nil && err != io.EOF {
-		return nil, fmt.Errorf("failed to close stream for %v: %v", filename, err)
+		return nil, errors.Wrapf(err, "failed to close stream for %v", filename)
 	}
 	if hash != stagedHash {
-		return nil, fmt.Errorf("unexpected SHA256 for sent chunks for %v: %v, want %v", filename, stagedHash, hash)
+		return nil, errors.Errorf("unexpected SHA256 for sent chunks for %v: %v, want %v", filename, stagedHash, hash)
 	}
 	return md, nil
 }
@@ -190,7 +190,7 @@ func stageChunks(stream pb.ArtifactStagingService_PutArtifactClient, r io.Reader
 				},
 			}
 			if err := stream.Send(chunk); err != nil {
-				return "", fmt.Errorf("chunk send failed: %v", err)
+				return "", errors.Wrap(err, "chunk send failed")
 			}
 		}
 		if err == io.EOF {
@@ -211,7 +211,7 @@ type KeyedFile struct {
 func scan(dir string) ([]KeyedFile, error) {
 	var ret []KeyedFile
 	if err := walk(dir, "", &ret); err != nil {
-		return nil, fmt.Errorf("failed to scan %v for artifacts to stage: %v", dir, err)
+		return nil, errors.Wrapf(err, "failed to scan %v for artifacts to stage", dir)
 	}
 	return ret, nil
 }
diff --git a/sdks/go/pkg/beam/testing/passert/hash.go b/sdks/go/pkg/beam/testing/passert/hash.go
index 4e5c4e6..225e5f7 100644
--- a/sdks/go/pkg/beam/testing/passert/hash.go
+++ b/sdks/go/pkg/beam/testing/passert/hash.go
@@ -22,6 +22,7 @@ import (
 	"sort"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Hash validates that the incoming PCollection<string> has the given size and
@@ -58,7 +59,7 @@ func (f *hashFn) ProcessElement(_ int, lines func(*string) bool) error {
 	hash := base64.StdEncoding.EncodeToString(md5W.Sum(nil))
 
 	if f.Size != len(col) || f.Hash != hash {
-		return fmt.Errorf("passert.Hash(%v) = (%v,%v), want (%v,%v)", f.Name, len(col), hash, f.Size, f.Hash)
+		return errors.Errorf("passert.Hash(%v) = (%v,%v), want (%v,%v)", f.Name, len(col), hash, f.Size, f.Hash)
 	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/testing/passert/passert.go b/sdks/go/pkg/beam/testing/passert/passert.go
index 21c9c4c..da38230 100644
--- a/sdks/go/pkg/beam/testing/passert/passert.go
+++ b/sdks/go/pkg/beam/testing/passert/passert.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
 )
 
@@ -135,7 +136,7 @@ func index(enc beam.ElementEncoder, iter func(*beam.T) bool) (map[string]indexEn
 	for iter(&val) {
 		var buf bytes.Buffer
 		if err := enc.Encode(val, &buf); err != nil {
-			return nil, fmt.Errorf("value %v not encodable with %v", val, enc)
+			return nil, errors.Errorf("value %v not encodable with %v", val, enc)
 		}
 		encoded := buf.String()
 
@@ -183,7 +184,7 @@ type failFn struct {
 }
 
 func (f *failFn) ProcessElement(x beam.X) error {
-	return fmt.Errorf(f.Format, x)
+	return errors.Errorf(f.Format, x)
 }
 
 type failKVFn struct {
@@ -191,7 +192,7 @@ type failKVFn struct {
 }
 
 func (f *failKVFn) ProcessElement(x beam.X, y beam.Y) error {
-	return fmt.Errorf(f.Format, fmt.Sprintf("(%v,%v)", x, y))
+	return errors.Errorf(f.Format, fmt.Sprintf("(%v,%v)", x, y))
 }
 
 type failGBKFn struct {
@@ -199,5 +200,5 @@ type failGBKFn struct {
 }
 
 func (f *failGBKFn) ProcessElement(x beam.X, _ func(*beam.Y) bool) error {
-	return fmt.Errorf(f.Format, fmt.Sprintf("(%v,*)", x))
+	return errors.Errorf(f.Format, fmt.Sprintf("(%v,*)", x))
 }
diff --git a/sdks/go/pkg/beam/testing/passert/sum.go b/sdks/go/pkg/beam/testing/passert/sum.go
index 63c72a3..2ef2ea4 100644
--- a/sdks/go/pkg/beam/testing/passert/sum.go
+++ b/sdks/go/pkg/beam/testing/passert/sum.go
@@ -19,6 +19,7 @@ import (
 	"fmt"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Sum validates that the incoming PCollection<int> is a singleton
@@ -46,7 +47,7 @@ func (f *sumFn) ProcessElement(_ int, values func(*int) bool) error {
 	}
 
 	if f.Sum != sum || f.Size != count {
-		return fmt.Errorf("passert.Sum(%v) = {%v, size: %v}, want {%v, size:%v}", f.Name, sum, count, f.Sum, f.Size)
+		return errors.Errorf("passert.Sum(%v) = {%v, size: %v}, want {%v, size:%v}", f.Name, sum, count, f.Sum, f.Size)
 	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/transforms/top/top.go b/sdks/go/pkg/beam/transforms/top/top.go
index 869510a..9ca2797 100644
--- a/sdks/go/pkg/beam/transforms/top/top.go
+++ b/sdks/go/pkg/beam/transforms/top/top.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
@@ -138,7 +139,7 @@ func (a *accum) unmarshal() error {
 	for _, val := range a.data {
 		element, err := a.dec.Decode(bytes.NewBuffer(val))
 		if err != nil {
-			return fmt.Errorf("top.accum: error unmarshal: %v", err)
+			return errors.WithContextf(err, "top.accum: unmarshalling")
 		}
 		a.list = append(a.list, element)
 	}
@@ -149,13 +150,13 @@ func (a *accum) unmarshal() error {
 // MarshalJSON uses the hook into the JSON encoder library to encode the accumulator.
 func (a accum) MarshalJSON() ([]byte, error) {
 	if a.enc == nil {
-		return nil, fmt.Errorf("top.accum: element encoder unspecified")
+		return nil, errors.Errorf("top.accum: element encoder unspecified")
 	}
 	var values [][]byte
 	for _, value := range a.list {
 		var buf bytes.Buffer
 		if err := a.enc.Encode(value, &buf); err != nil {
-			return nil, fmt.Errorf("top.accum: marshalling of %v failed: %v", value, err)
+			return nil, errors.WithContextf(err, "top.accum: marshalling %v", value)
 		}
 		values = append(values, buf.Bytes())
 	}
diff --git a/sdks/go/pkg/beam/util/gcsx/gcs.go b/sdks/go/pkg/beam/util/gcsx/gcs.go
index 3601b80..70a325b 100644
--- a/sdks/go/pkg/beam/util/gcsx/gcs.go
+++ b/sdks/go/pkg/beam/util/gcsx/gcs.go
@@ -25,6 +25,7 @@ import (
 	"path"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"google.golang.org/api/option"
 )
 
@@ -111,10 +112,10 @@ func ParseObject(object string) (bucket, path string, err error) {
 	}
 
 	if parsed.Scheme != "gs" {
-		return "", "", fmt.Errorf("object %s must have 'gs' scheme", object)
+		return "", "", errors.Errorf("object %s must have 'gs' scheme", object)
 	}
 	if parsed.Host == "" {
-		return "", "", fmt.Errorf("object %s must have bucket", object)
+		return "", "", errors.Errorf("object %s must have bucket", object)
 	}
 	if parsed.Path == "" {
 		return parsed.Host, "", nil
diff --git a/sdks/go/pkg/beam/util/grpcx/dial.go b/sdks/go/pkg/beam/util/grpcx/dial.go
index 1299b71..8651c96 100644
--- a/sdks/go/pkg/beam/util/grpcx/dial.go
+++ b/sdks/go/pkg/beam/util/grpcx/dial.go
@@ -17,9 +17,9 @@ package grpcx
 
 import (
 	"context"
-	"fmt"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"google.golang.org/grpc"
 )
 
@@ -35,7 +35,7 @@ func DefaultDial(ctx context.Context, endpoint string, timeout time.Duration) (*
 	cc, err := grpc.DialContext(ctx, endpoint, grpc.WithInsecure(), grpc.WithBlock(),
 		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(50<<20)))
 	if err != nil {
-		return nil, fmt.Errorf("failed to dial server at %v: %v", endpoint, err)
+		return nil, errors.Wrapf(err, "failed to dial server at %v", endpoint)
 	}
 	return cc, nil
 }
diff --git a/sdks/go/pkg/beam/util/grpcx/metadata.go b/sdks/go/pkg/beam/util/grpcx/metadata.go
index 0d77d62..7c0651f 100644
--- a/sdks/go/pkg/beam/util/grpcx/metadata.go
+++ b/sdks/go/pkg/beam/util/grpcx/metadata.go
@@ -18,9 +18,8 @@ package grpcx
 
 import (
 	"context"
-	"errors"
-	"fmt"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"google.golang.org/grpc/metadata"
 )
 
@@ -34,10 +33,10 @@ func ReadWorkerID(ctx context.Context) (string, error) {
 	}
 	id, ok := md[idKey]
 	if !ok || len(id) < 1 {
-		return "", fmt.Errorf("failed to find worker id in metadata %v", md)
+		return "", errors.Errorf("failed to find worker id in metadata %v", md)
 	}
 	if len(id) > 1 {
-		return "", fmt.Errorf("multiple worker ids in metadata: %v", id)
+		return "", errors.Errorf("multiple worker ids in metadata: %v", id)
 	}
 	return id[0], nil
 }
diff --git a/sdks/go/pkg/beam/util/pubsubx/pubsub.go b/sdks/go/pkg/beam/util/pubsubx/pubsub.go
index 281b41d..585b4dc 100644
--- a/sdks/go/pkg/beam/util/pubsubx/pubsub.go
+++ b/sdks/go/pkg/beam/util/pubsubx/pubsub.go
@@ -22,6 +22,7 @@ import (
 	"time"
 
 	"cloud.google.com/go/pubsub"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -102,7 +103,7 @@ func Publish(ctx context.Context, project, topic string, messages ...string) (*p
 		}
 		id, err := t.Publish(ctx, m).Get(ctx)
 		if err != nil {
-			return nil, fmt.Errorf("failed to publish '%v': %v", msg, err)
+			return nil, errors.Wrapf(err, "failed to publish '%v'", msg)
 		}
 		log.Infof(ctx, "Published %v with id: %v", msg, id)
 	}
diff --git a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go
index eb81ea8..232a421 100644
--- a/sdks/go/pkg/beam/util/starcgenx/starcgenx.go
+++ b/sdks/go/pkg/beam/util/starcgenx/starcgenx.go
@@ -30,6 +30,7 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/shimx"
 )
 
@@ -151,7 +152,7 @@ func (e *Extractor) FromAsts(imp types.Importer, fset *token.FileSet, files []*a
 	}
 
 	if _, err := conf.Check(e.Package, fset, files, info); err != nil {
-		return fmt.Errorf("failed to type check package %s : %v", e.Package, err)
+		return errors.Wrapf(err, "failed to type check package %s : %v", e.Package)
 	}
 
 	e.Print("/*\n")
@@ -179,7 +180,7 @@ func (e *Extractor) FromAsts(imp types.Importer, fset *token.FileSet, files []*a
 		}
 	}
 	if len(notFound) > 0 {
-		return fmt.Errorf("couldn't find the following identifiers; please check for typos, or remove them: %v", strings.Join(notFound, ", "))
+		return errors.Errorf("couldn't find the following identifiers; please check for typos, or remove them: %v", strings.Join(notFound, ", "))
 	}
 	e.Print("*/\n")