You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:40:00 UTC
[beam] branch lostluck-protosuffix updated: Update statemgr_test.go
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
new ca30a64 Update statemgr_test.go
ca30a64 is described below
commit ca30a64c68f9c51f3e348486cbb195260d7dda53
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:39:50 2020 -0700
Update statemgr_test.go
---
.../pkg/beam/core/runtime/harness/statemgr_test.go | 48 +++++++++++-----------
1 file changed, 24 insertions(+), 24 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
index 351dd13..3975217 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
@@ -26,24 +26,24 @@ import (
"testing"
"time"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
)
// fakeStateClient replicates the call and response protocol
// of the state channel.
type fakeStateClient struct {
// Blocks the read routine
- recv chan *pb.StateResponse
+ recv chan *fnpb.StateResponse
recvErr error
recvMu sync.Mutex
// Blocks the write routine
- send chan *pb.StateRequest
+ send chan *fnpb.StateRequest
sendErr error
sendMu sync.Mutex
}
-func (f *fakeStateClient) Recv() (*pb.StateResponse, error) {
+func (f *fakeStateClient) Recv() (*fnpb.StateResponse, error) {
// Blocks until something is sent.
v := <-f.recv
f.recvMu.Lock()
@@ -51,7 +51,7 @@ func (f *fakeStateClient) Recv() (*pb.StateResponse, error) {
return v, f.recvErr
}
-func (f *fakeStateClient) Send(req *pb.StateRequest) error {
+func (f *fakeStateClient) Send(req *fnpb.StateRequest) error {
f.send <- req // blocks until consumed.
f.sendMu.Lock()
defer f.sendMu.Unlock()
@@ -92,10 +92,10 @@ func TestStateChannel(t *testing.T) {
for i := 0; i < count; i++ {
go func() {
req := <-client.send
- client.recv <- &pb.StateResponse{
+ client.recv <- &fnpb.StateResponse{
Id: req.Id, // Ids need to match up to ensure routing can occur properly.
- Response: &pb.StateResponse_Get{
- Get: &pb.StateGetResponse{
+ Response: &fnpb.StateResponse_Get{
+ Get: &fnpb.StateGetResponse{
ContinuationToken: req.GetGet().GetContinuationToken(),
},
},
@@ -104,9 +104,9 @@ func TestStateChannel(t *testing.T) {
}
for i := 0; i < count; i++ {
token := []byte(fmt.Sprintf("%d", i))
- resp, err := c.Send(&pb.StateRequest{
- Request: &pb.StateRequest_Get{
- Get: &pb.StateGetRequest{
+ resp, err := c.Send(&fnpb.StateRequest{
+ Request: &fnpb.StateRequest_Get{
+ Get: &fnpb.StateGetRequest{
ContinuationToken: token,
},
},
@@ -127,11 +127,11 @@ func TestStateChannel(t *testing.T) {
req := <-client.send // Send should succeed.
client.setRecvErr(io.EOF)
- client.recv <- &pb.StateResponse{
+ client.recv <- &fnpb.StateResponse{
Id: req.Id,
}
}()
- _, err := c.Send(&pb.StateRequest{})
+ _, err := c.Send(&fnpb.StateRequest{})
return err
},
expectedErr: io.EOF,
@@ -143,11 +143,11 @@ func TestStateChannel(t *testing.T) {
req := <-client.send // Send should succeed.
client.setRecvErr(expectedError)
- client.recv <- &pb.StateResponse{
+ client.recv <- &fnpb.StateResponse{
Id: req.Id,
}
}()
- _, err := c.Send(&pb.StateRequest{})
+ _, err := c.Send(&fnpb.StateRequest{})
return err
},
expectedErr: expectedError,
@@ -163,14 +163,14 @@ func TestStateChannel(t *testing.T) {
delete(c.responses, req.Id)
c.mu.Unlock()
- resp := &pb.StateResponse{
+ resp := &fnpb.StateResponse{
Id: req.Id,
}
client.recv <- resp
// unblock Send.
ch <- resp
}()
- _, err := c.Send(&pb.StateRequest{})
+ _, err := c.Send(&fnpb.StateRequest{})
return err
},
}, {
@@ -182,11 +182,11 @@ func TestStateChannel(t *testing.T) {
// This can be plumbed through on either side, write or read,
// the important part is that we get it.
client.setRecvErr(expectedError)
- client.recv <- &pb.StateResponse{
+ client.recv <- &fnpb.StateResponse{
Id: req.Id,
}
}()
- _, err := c.Send(&pb.StateRequest{})
+ _, err := c.Send(&fnpb.StateRequest{})
return err
},
expectedErr: expectedError,
@@ -199,7 +199,7 @@ func TestStateChannel(t *testing.T) {
<-client.send
// Shouldn't need to unblock any Recv calls.
}()
- _, err := c.Send(&pb.StateRequest{})
+ _, err := c.Send(&fnpb.StateRequest{})
return err
},
expectedErr: expectedError,
@@ -209,8 +209,8 @@ func TestStateChannel(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := &fakeStateClient{
- recv: make(chan *pb.StateResponse),
- send: make(chan *pb.StateRequest),
+ recv: make(chan *fnpb.StateResponse),
+ send: make(chan *fnpb.StateRequest),
}
ctx, cancelFn := context.WithCancel(context.Background())
c := makeStateChannel(ctx, cancelFn, "id", client)
@@ -234,9 +234,9 @@ func TestStateChannel(t *testing.T) {
client.setRecvErr(nil)
// Drain the next send, and ensure the response is unblocked.
req := <-client.send
- client.recv <- &pb.StateResponse{Id: req.Id} // Ids need to match up to ensure routing can occur properly.
+ client.recv <- &fnpb.StateResponse{Id: req.Id} // Ids need to match up to ensure routing can occur properly.
}()
- if _, err := c.Send(&pb.StateRequest{}); !contains(err, test.expectedErr) {
+ if _, err := c.Send(&fnpb.StateRequest{}); !contains(err, test.expectedErr) {
t.Errorf("Unexpected error from Send: got %v, want %v", err, test.expectedErr)
}