You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:28:09 UTC
[beam] branch lostluck-protosuffix updated: Update retrieval.go
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
new de68243 Update retrieval.go
de68243 is described below
commit de682436ca3e245111bb42fa273ec74603f17db2
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:27:57 2020 -0700
Update retrieval.go
---
sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index bf35159..3c7da43 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -20,7 +20,7 @@ import (
"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
@@ -30,12 +30,12 @@ import (
// Cloud Storage (GCS). It serves a single manifest and ignores
// the worker id. The server performs no caching or pre-fetching.
type RetrievalServer struct {
- md *pb.Manifest
+ md *jobpb.Manifest
blobs map[string]string
}
// ReadProxyManifest reads and parses the proxy manifest from GCS.
-func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
+func ReadProxyManifest(ctx context.Context, object string) (*jobpb.ProxyManifest, error) {
bucket, obj, err := gcsx.ParseObject(object)
if err != nil {
return nil, errors.Wrapf(err, "invalid manifest object %v", object)
@@ -49,7 +49,7 @@ func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, e
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest %v", object)
}
- var md pb.ProxyManifest
+ var md jobpb.ProxyManifest
if err := proto.Unmarshal(content, &md); err != nil {
return nil, errors.Wrapf(err, "invalid manifest %v", object)
}
@@ -58,7 +58,7 @@ func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, e
// NewRetrievalServer creates a artifact retrieval server for the
// given manifest. It requires that the locations are in GCS.
-func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
+func NewRetrievalServer(md *jobpb.ProxyManifest) (*RetrievalServer, error) {
if err := validate(md); err != nil {
return nil, err
}
@@ -74,12 +74,12 @@ func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
}
// GetManifest returns the manifest for all artifacts.
-func (s *RetrievalServer) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
- return &pb.GetManifestResponse{Manifest: s.md}, nil
+func (s *RetrievalServer) GetManifest(ctx context.Context, req *jobpb.GetManifestRequest) (*jobpb.GetManifestResponse, error) {
+ return &jobpb.GetManifestResponse{Manifest: s.md}, nil
}
// GetArtifact returns a given artifact.
-func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream pb.LegacyArtifactRetrievalService_GetArtifactServer) error {
+func (s *RetrievalServer) GetArtifact(req *jobpb.LegacyGetArtifactRequest, stream jobpb.LegacyArtifactRetrievalService_GetArtifactServer) error {
key := req.GetName()
blob, ok := s.blobs[key]
if !ok {
@@ -105,7 +105,7 @@ func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream p
for {
n, err := r.Read(data)
if n > 0 {
- if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
+ if err := stream.Send(&jobpb.ArtifactChunk{Data: data[:n]}); err != nil {
return errors.Wrap(err, "chunk send failed")
}
}
@@ -119,7 +119,7 @@ func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream p
return nil
}
-func validate(md *pb.ProxyManifest) error {
+func validate(md *jobpb.ProxyManifest) error {
keys := make(map[string]bool)
for _, a := range md.GetManifest().GetArtifact() {
if _, seen := keys[a.Name]; seen {