You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:15:37 UTC
[beam] branch lostluck-protosuffix updated: Update server_test.go
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
new 632535c Update server_test.go
632535c is described below
commit 632535c38acbe241ed4632384b0bac16f402b086
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:15:25 2020 -0700
Update server_test.go
---
sdks/go/pkg/beam/artifact/server_test.go | 26 +++++++++++++-------------
1 file changed, 13 insertions(+), 13 deletions(-)
diff --git a/sdks/go/pkg/beam/artifact/server_test.go b/sdks/go/pkg/beam/artifact/server_test.go
index bf14b3b..b2617f8 100644
--- a/sdks/go/pkg/beam/artifact/server_test.go
+++ b/sdks/go/pkg/beam/artifact/server_test.go
@@ -23,7 +23,7 @@ import (
"time"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"golang.org/x/net/context"
"google.golang.org/grpc"
@@ -42,8 +42,8 @@ func startServer(t *testing.T) *grpc.ClientConn {
real := &server{m: make(map[string]*manifest)}
gs := grpc.NewServer()
- pb.RegisterLegacyArtifactStagingServiceServer(gs, real)
- pb.RegisterLegacyArtifactRetrievalServiceServer(gs, real)
+ jobpb.RegisterLegacyArtifactStagingServiceServer(gs, real)
+ jobpb.RegisterLegacyArtifactRetrievalServiceServer(gs, real)
go gs.Serve(listener)
t.Logf("server listening on %v", endpoint)
@@ -56,12 +56,12 @@ func startServer(t *testing.T) *grpc.ClientConn {
}
type data struct {
- md *pb.ArtifactMetadata
+ md *jobpb.ArtifactMetadata
chunks [][]byte
}
type manifest struct {
- md *pb.Manifest
+ md *jobpb.Manifest
m map[string]*data // key -> data
mu sync.Mutex
}
@@ -72,7 +72,7 @@ type server struct {
mu sync.Mutex
}
-func (s *server) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifactServer) error {
+func (s *server) PutArtifact(ps jobpb.LegacyArtifactStagingService_PutArtifactServer) error {
// Read header
header, err := ps.Recv()
@@ -118,10 +118,10 @@ func (s *server) PutArtifact(ps pb.LegacyArtifactStagingService_PutArtifactServe
m.m[key] = &data{chunks: chunks}
m.mu.Unlock()
- return ps.SendAndClose(&pb.PutArtifactResponse{})
+ return ps.SendAndClose(&jobpb.PutArtifactResponse{})
}
-func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+func (s *server) CommitManifest(ctx context.Context, req *jobpb.CommitManifestRequest) (*jobpb.CommitManifestResponse, error) {
token := req.GetStagingSessionToken()
if token == "" {
return nil, errors.New("missing staging session token")
@@ -147,10 +147,10 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
}
m.md = req.GetManifest()
- return &pb.CommitManifestResponse{RetrievalToken: token}, nil
+ return &jobpb.CommitManifestResponse{RetrievalToken: token}, nil
}
-func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
+func (s *server) GetManifest(ctx context.Context, req *jobpb.GetManifestRequest) (*jobpb.GetManifestResponse, error) {
token := req.GetRetrievalToken()
if token == "" {
return nil, errors.New("missing retrieval token")
@@ -163,10 +163,10 @@ func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*
m.mu.Lock()
defer m.mu.Unlock()
- return &pb.GetManifestResponse{Manifest: m.md}, nil
+ return &jobpb.GetManifestResponse{Manifest: m.md}, nil
}
-func (s *server) GetArtifact(req *pb.LegacyGetArtifactRequest, stream pb.LegacyArtifactRetrievalService_GetArtifactServer) error {
+func (s *server) GetArtifact(req *jobpb.LegacyGetArtifactRequest, stream jobpb.LegacyArtifactRetrievalService_GetArtifactServer) error {
token := req.GetRetrievalToken()
if token == "" {
return errors.New("missing retrieval token")
@@ -192,7 +192,7 @@ func (s *server) GetArtifact(req *pb.LegacyGetArtifactRequest, stream pb.LegacyA
// Send chunks exactly as we received them.
for _, chunk := range chunks {
- if err := stream.Send(&pb.ArtifactChunk{Data: chunk}); err != nil {
+ if err := stream.Send(&jobpb.ArtifactChunk{Data: chunk}); err != nil {
return err
}
}