You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:29:02 UTC
[beam] branch lostluck-protosuffix updated: Update staging.go
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
new cf19153 Update staging.go
cf19153 is described below
commit cf191537093d185f577b9a6f8a4349326acbae75
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:28:52 2020 -0700
Update staging.go
---
sdks/go/pkg/beam/artifact/gcsproxy/staging.go | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index fa5933b..095450e 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -26,7 +26,7 @@ import (
"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
@@ -64,7 +64,7 @@ func NewStagingServer(manifest string) (*StagingServer, error) {
}
// CommitManifest commits the given artifact manifest to GCS.
-func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+func (s *StagingServer) CommitManifest(ctx context.Context, req *jobpb.CommitManifestRequest) (*jobpb.CommitManifestResponse, error) {
manifest := req.GetManifest()
s.mu.Lock()
@@ -75,7 +75,7 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife
}
s.mu.Unlock()
- data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
+ data, err := proto.Marshal(&jobpb.ProxyManifest{Manifest: manifest, Location: loc})
if err != nil {
return nil, errors.Wrap(err, "failed to marshal proxy manifest")
}
@@ -93,13 +93,13 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife
// now, but would be needed for a staging server that serves multiple
// jobs. Such a server would also use the ID sent with each request.
- return &pb.CommitManifestResponse{RetrievalToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
+ return &jobpb.CommitManifestResponse{RetrievalToken: gcsx.MakeObject(s.bucket, s.manifest)}, nil
}
// matchLocations ensures that all artifacts have been staged and have valid
// content. It is fine for staged artifacts to not appear in the manifest.
-func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) ([]*pb.ProxyManifest_Location, error) {
- var loc []*pb.ProxyManifest_Location
+func matchLocations(artifacts []*jobpb.ArtifactMetadata, blobs map[string]staged) ([]*jobpb.ProxyManifest_Location, error) {
+ var loc []*jobpb.ProxyManifest_Location
for _, a := range artifacts {
info, ok := blobs[a.Name]
if !ok {
@@ -112,13 +112,13 @@ func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) (
return nil, errors.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
}
- loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
+ loc = append(loc, &jobpb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
}
return loc, nil
}
// PutArtifact stores the given artifact in GCS.
-func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifactServer) error {
+func (s *StagingServer) PutArtifact(ps jobpb.LegacyArtifactStagingService_PutArtifactServer) error {
// Read header
header, err := ps.Recv()
@@ -153,7 +153,7 @@ func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifa
s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), hash: hash}
s.mu.Unlock()
- return ps.SendAndClose(&pb.PutArtifactResponse{})
+ return ps.SendAndClose(&jobpb.PutArtifactResponse{})
}
// reader is an adapter between the artifact stream and the GCS stream reader.
@@ -161,7 +161,7 @@ func (s *StagingServer) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifa
type reader struct {
sha256W hash.Hash
buf []byte
- stream pb.LegacyArtifactStagingService_PutArtifactServer
+ stream jobpb.LegacyArtifactStagingService_PutArtifactServer
}
func (r *reader) Read(buf []byte) (int, error) {