You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:28:09 UTC

[beam] branch lostluck-protosuffix updated: Update retrieval.go

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
     new de68243  Update retrieval.go
de68243 is described below

commit de682436ca3e245111bb42fa273ec74603f17db2
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:27:57 2020 -0700

    Update retrieval.go
---
 sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
index bf35159..3c7da43 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
@@ -20,7 +20,7 @@ import (
 
 	"cloud.google.com/go/storage"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 	"github.com/golang/protobuf/proto"
 	"golang.org/x/net/context"
@@ -30,12 +30,12 @@ import (
 // Cloud Storage (GCS). It serves a single manifest and ignores
 // the worker id. The server performs no caching or pre-fetching.
 type RetrievalServer struct {
-	md    *pb.Manifest
+	md    *jobpb.Manifest
 	blobs map[string]string
 }
 
 // ReadProxyManifest reads and parses the proxy manifest from GCS.
-func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
+func ReadProxyManifest(ctx context.Context, object string) (*jobpb.ProxyManifest, error) {
 	bucket, obj, err := gcsx.ParseObject(object)
 	if err != nil {
 		return nil, errors.Wrapf(err, "invalid manifest object %v", object)
@@ -49,7 +49,7 @@ func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, e
 	if err != nil {
 		return nil, errors.Wrapf(err, "failed to read manifest %v", object)
 	}
-	var md pb.ProxyManifest
+	var md jobpb.ProxyManifest
 	if err := proto.Unmarshal(content, &md); err != nil {
 		return nil, errors.Wrapf(err, "invalid manifest %v", object)
 	}
@@ -58,7 +58,7 @@ func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, e
 
 // NewRetrievalServer creates a artifact retrieval server for the
 // given manifest. It requires that the locations are in GCS.
-func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
+func NewRetrievalServer(md *jobpb.ProxyManifest) (*RetrievalServer, error) {
 	if err := validate(md); err != nil {
 		return nil, err
 	}
@@ -74,12 +74,12 @@ func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
 }
 
 // GetManifest returns the manifest for all artifacts.
-func (s *RetrievalServer) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
-	return &pb.GetManifestResponse{Manifest: s.md}, nil
+func (s *RetrievalServer) GetManifest(ctx context.Context, req *jobpb.GetManifestRequest) (*jobpb.GetManifestResponse, error) {
+	return &jobpb.GetManifestResponse{Manifest: s.md}, nil
 }
 
 // GetArtifact returns a given artifact.
-func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream pb.LegacyArtifactRetrievalService_GetArtifactServer) error {
+func (s *RetrievalServer) GetArtifact(req *jobpb.LegacyGetArtifactRequest, stream jobpb.LegacyArtifactRetrievalService_GetArtifactServer) error {
 	key := req.GetName()
 	blob, ok := s.blobs[key]
 	if !ok {
@@ -105,7 +105,7 @@ func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream p
 	for {
 		n, err := r.Read(data)
 		if n > 0 {
-			if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
+			if err := stream.Send(&jobpb.ArtifactChunk{Data: data[:n]}); err != nil {
 				return errors.Wrap(err, "chunk send failed")
 			}
 		}
@@ -119,7 +119,7 @@ func (s *RetrievalServer) GetArtifact(req *pb.LegacyGetArtifactRequest, stream p
 	return nil
 }
 
-func validate(md *pb.ProxyManifest) error {
+func validate(md *jobpb.ProxyManifest) error {
 	keys := make(map[string]bool)
 	for _, a := range md.GetManifest().GetArtifact() {
 		if _, seen := keys[a.Name]; seen {