You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:29:02 UTC

[beam] branch lostluck-protosuffix updated: Update staging.go

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
     new cf19153  Update staging.go
cf19153 is described below

commit cf191537093d185f577b9a6f8a4349326acbae75
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:28:52 2020 -0700

    Update staging.go
---
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index fa5933b..095450e 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -26,7 +26,7 @@ import (
 
 	"cloud.google.com/go/storage"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 	"github.com/golang/protobuf/proto"
 	"golang.org/x/net/context"
@@ -64,7 +64,7 @@ func NewStagingServer(manifest string) (*StagingServer, error) {
 }
 
 // CommitManifest commits the given artifact manifest to GCS.
-func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+func (s *StagingServer) CommitManifest(ctx context.Context, req *jobpb.CommitManifestRequest) (*jobpb.CommitManifestResponse, error) {
 	manifest := req.GetManifest()
 
 	s.mu.Lock()
@@ -75,7 +75,7 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife
 	}
 	s.mu.Unlock()
 
-	data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
+	data, err := proto.Marshal(&jobpb.ProxyManifest{Manifest: manifest, Location: loc})
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to marshal proxy manifest")
 	}
@@ -93,13 +93,13 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife
 	// now, but would be needed for a staging server that serves multiple
 	// jobs. Such a server would also use the ID sent with each request.
 
-	return &pb.CommitManifestResponse{RetrievalToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
+	return &jobpb.CommitManifestResponse{RetrievalToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
 }
 
 // matchLocations ensures that all artifacts have been staged and have valid
 // content. It is fine for staged artifacts to not appear in the manifest.
-func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) ([]*pb.ProxyManifest_Location, error) {
-	var loc []*pb.ProxyManifest_Location
+func matchLocations(artifacts []*jobpb.ArtifactMetadata, blobs map[string]staged) ([]*jobpb.ProxyManifest_Location, error) {
+	var loc []*jobpb.ProxyManifest_Location
 	for _, a := range artifacts {
 		info, ok := blobs[a.Name]
 		if !ok {
@@ -112,13 +112,13 @@ func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) (
 			return nil, errors.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
 		}
 
-		loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
+		loc = append(loc, &jobpb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
 	}
 	return loc, nil
 }
 
 // PutArtifact stores the given artifact in GCS.
-func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifactServer) error {
+func (s *StagingServer) PutArtifact(ps jobpb.LegacyArtifactStagingService_PutArtifactServer) error {
 	// Read header
 
 	header, err := ps.Recv()
@@ -153,7 +153,7 @@ func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifa
 	s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), hash: hash}
 	s.mu.Unlock()
 
-	return ps.SendAndClose(&pb.PutArtifactResponse{})
+	return ps.SendAndClose(&jobpb.PutArtifactResponse{})
 }
 
 // reader is an adapter between the artifact stream and the GCS stream reader.
@@ -161,7 +161,7 @@ func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifa
 type reader struct {
 	sha256W hash.Hash
 	buf     []byte
-	stream  pb.LegacyArtifactStagingService_PutArtifactServer
+	stream  jobpb.LegacyArtifactStagingService_PutArtifactServer
 }
 
 func (r *reader) Read(buf []byte) (int, error) {