You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:40:00 UTC

[beam] branch lostluck-protosuffix updated: Update statemgr_test.go

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
     new ca30a64  Update statemgr_test.go
ca30a64 is described below

commit ca30a64c68f9c51f3e348486cbb195260d7dda53
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:39:50 2020 -0700

    Update statemgr_test.go
---
 .../pkg/beam/core/runtime/harness/statemgr_test.go | 48 +++++++++++-----------
 1 file changed, 24 insertions(+), 24 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
index 351dd13..3975217 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
@@ -26,24 +26,24 @@ import (
 	"testing"
 	"time"
 
-	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
 
 // fakeStateClient replicates the call and response protocol
 // of the state channel.
 type fakeStateClient struct {
 	// Blocks the read routine
-	recv    chan *pb.StateResponse
+	recv    chan *fnpb.StateResponse
 	recvErr error
 	recvMu  sync.Mutex
 
 	// Blocks the write routine
-	send    chan *pb.StateRequest
+	send    chan *fnpb.StateRequest
 	sendErr error
 	sendMu  sync.Mutex
 }
 
-func (f *fakeStateClient) Recv() (*pb.StateResponse, error) {
+func (f *fakeStateClient) Recv() (*fnpb.StateResponse, error) {
 	// Blocks until something is sent.
 	v := <-f.recv
 	f.recvMu.Lock()
@@ -51,7 +51,7 @@ func (f *fakeStateClient) Recv() (*pb.StateResponse, error) {
 	return v, f.recvErr
 }
 
-func (f *fakeStateClient) Send(req *pb.StateRequest) error {
+func (f *fakeStateClient) Send(req *fnpb.StateRequest) error {
 	f.send <- req // blocks until consumed.
 	f.sendMu.Lock()
 	defer f.sendMu.Unlock()
@@ -92,10 +92,10 @@ func TestStateChannel(t *testing.T) {
 				for i := 0; i < count; i++ {
 					go func() {
 						req := <-client.send
-						client.recv <- &pb.StateResponse{
+						client.recv <- &fnpb.StateResponse{
 							Id: req.Id, // Ids need to match up to ensure routing can occur properly.
-							Response: &pb.StateResponse_Get{
-								Get: &pb.StateGetResponse{
+							Response: &fnpb.StateResponse_Get{
+								Get: &fnpb.StateGetResponse{
 									ContinuationToken: req.GetGet().GetContinuationToken(),
 								},
 							},
@@ -104,9 +104,9 @@ func TestStateChannel(t *testing.T) {
 				}
 				for i := 0; i < count; i++ {
 					token := []byte(fmt.Sprintf("%d", i))
-					resp, err := c.Send(&pb.StateRequest{
-						Request: &pb.StateRequest_Get{
-							Get: &pb.StateGetRequest{
+					resp, err := c.Send(&fnpb.StateRequest{
+						Request: &fnpb.StateRequest_Get{
+							Get: &fnpb.StateGetRequest{
 								ContinuationToken: token,
 							},
 						},
@@ -127,11 +127,11 @@ func TestStateChannel(t *testing.T) {
 					req := <-client.send // Send should succeed.
 
 					client.setRecvErr(io.EOF)
-					client.recv <- &pb.StateResponse{
+					client.recv <- &fnpb.StateResponse{
 						Id: req.Id,
 					}
 				}()
-				_, err := c.Send(&pb.StateRequest{})
+				_, err := c.Send(&fnpb.StateRequest{})
 				return err
 			},
 			expectedErr:       io.EOF,
@@ -143,11 +143,11 @@ func TestStateChannel(t *testing.T) {
 					req := <-client.send // Send should succeed.
 
 					client.setRecvErr(expectedError)
-					client.recv <- &pb.StateResponse{
+					client.recv <- &fnpb.StateResponse{
 						Id: req.Id,
 					}
 				}()
-				_, err := c.Send(&pb.StateRequest{})
+				_, err := c.Send(&fnpb.StateRequest{})
 				return err
 			},
 			expectedErr:       expectedError,
@@ -163,14 +163,14 @@ func TestStateChannel(t *testing.T) {
 					delete(c.responses, req.Id)
 					c.mu.Unlock()
 
-					resp := &pb.StateResponse{
+					resp := &fnpb.StateResponse{
 						Id: req.Id,
 					}
 					client.recv <- resp
 					// unblock Send.
 					ch <- resp
 				}()
-				_, err := c.Send(&pb.StateRequest{})
+				_, err := c.Send(&fnpb.StateRequest{})
 				return err
 			},
 		}, {
@@ -182,11 +182,11 @@ func TestStateChannel(t *testing.T) {
 					// This can be plumbed through on either side, write or read,
 					// the important part is that we get it.
 					client.setRecvErr(expectedError)
-					client.recv <- &pb.StateResponse{
+					client.recv <- &fnpb.StateResponse{
 						Id: req.Id,
 					}
 				}()
-				_, err := c.Send(&pb.StateRequest{})
+				_, err := c.Send(&fnpb.StateRequest{})
 				return err
 			},
 			expectedErr:       expectedError,
@@ -199,7 +199,7 @@ func TestStateChannel(t *testing.T) {
 					<-client.send
 					// Shouldn't need to unblock any Recv calls.
 				}()
-				_, err := c.Send(&pb.StateRequest{})
+				_, err := c.Send(&fnpb.StateRequest{})
 				return err
 			},
 			expectedErr:       expectedError,
@@ -209,8 +209,8 @@ func TestStateChannel(t *testing.T) {
 	for _, test := range tests {
 		t.Run(test.name, func(t *testing.T) {
 			client := &fakeStateClient{
-				recv: make(chan *pb.StateResponse),
-				send: make(chan *pb.StateRequest),
+				recv: make(chan *fnpb.StateResponse),
+				send: make(chan *fnpb.StateRequest),
 			}
 			ctx, cancelFn := context.WithCancel(context.Background())
 			c := makeStateChannel(ctx, cancelFn, "id", client)
@@ -234,9 +234,9 @@ func TestStateChannel(t *testing.T) {
 				client.setRecvErr(nil)
 				// Drain the next send, and ensure the response is unblocked.
 				req := <-client.send
-				client.recv <- &pb.StateResponse{Id: req.Id} // Ids need to match up to ensure routing can occur properly.
+				client.recv <- &fnpb.StateResponse{Id: req.Id} // Ids need to match up to ensure routing can occur properly.
 			}()
-			if _, err := c.Send(&pb.StateRequest{}); !contains(err, test.expectedErr) {
+			if _, err := c.Send(&fnpb.StateRequest{}); !contains(err, test.expectedErr) {
 				t.Errorf("Unexpected error from Send: got %v, want %v", err, test.expectedErr)
 			}