You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:38:41 UTC

[beam] branch lostluck-protosuffix updated: Update statemgr.go

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
     new e1a2730  Update statemgr.go
e1a2730 is described below

commit e1a2730a3adc573dec73f2bc6e0cbecc3db524c8
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:38:30 2020 -0700

    Update statemgr.go
---
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 46 +++++++++++------------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 4c0e7f5..3ba9d27 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -26,7 +26,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
-	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/golang/protobuf/proto"
 )
 
@@ -104,7 +104,7 @@ func (s *ScopedStateReader) Close() error {
 
 type stateKeyReader struct {
 	instID instructionID
-	key    *pb.StateKey
+	key    *fnpb.StateKey
 
 	token []byte
 	buf   []byte
@@ -116,9 +116,9 @@ type stateKeyReader struct {
 }
 
 func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string, instID instructionID, k, w []byte) *stateKeyReader {
-	key := &pb.StateKey{
-		Type: &pb.StateKey_MultimapSideInput_{
-			MultimapSideInput: &pb.StateKey_MultimapSideInput{
+	key := &fnpb.StateKey{
+		Type: &fnpb.StateKey_MultimapSideInput_{
+			MultimapSideInput: &fnpb.StateKey_MultimapSideInput{
 				TransformId: id.PtransformID,
 				SideInputId: sideInputID,
 				Window:      w,
@@ -134,9 +134,9 @@ func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string,
 }
 
 func newRunnerReader(ch *StateChannel, instID instructionID, k []byte) *stateKeyReader {
-	key := &pb.StateKey{
-		Type: &pb.StateKey_Runner_{
-			Runner: &pb.StateKey_Runner{
+	key := &fnpb.StateKey{
+		Type: &fnpb.StateKey_Runner_{
+			Runner: &fnpb.StateKey_Runner{
 				Key: k,
 			},
 		},
@@ -164,12 +164,12 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) {
 		localChannel := r.ch
 		r.mu.Unlock()
 
-		req := &pb.StateRequest{
+		req := &fnpb.StateRequest{
 			// Id: set by StateChannel
 			InstructionId: string(r.instID),
 			StateKey:      r.key,
-			Request: &pb.StateRequest_Get{
-				Get: &pb.StateGetRequest{
+			Request: &fnpb.StateRequest_Get{
+				Get: &fnpb.StateGetRequest{
 					ContinuationToken: r.token,
 				},
 			},
@@ -243,8 +243,8 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
 }
 
 type stateClient interface {
-	Send(*pb.StateRequest) error
-	Recv() (*pb.StateResponse, error)
+	Send(*fnpb.StateRequest) error
+	Recv() (*fnpb.StateResponse, error)
 }
 
 // StateChannel manages state transactions over a single gRPC connection.
@@ -254,10 +254,10 @@ type StateChannel struct {
 	id     string
 	client stateClient
 
-	requests      chan *pb.StateRequest
+	requests      chan *fnpb.StateRequest
 	nextRequestNo int32
 
-	responses map[string]chan<- *pb.StateResponse
+	responses map[string]chan<- *fnpb.StateResponse
 	mu        sync.Mutex
 
 	// a closure that forces the state manager to recreate this stream.
@@ -285,7 +285,7 @@ func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, error)
 	if err != nil {
 		return nil, errors.Wrapf(err, "failed to connect to state service %v", port.URL)
 	}
-	client, err := pb.NewBeamFnStateClient(cc).State(ctx)
+	client, err := fnpb.NewBeamFnStateClient(cc).State(ctx)
 	if err != nil {
 		cc.Close()
 		return nil, errors.Wrapf(err, "failed to create state client %v", port.URL)
@@ -297,8 +297,8 @@ func makeStateChannel(ctx context.Context, cancelFn context.CancelFunc, id strin
 	ret := &StateChannel{
 		id:        id,
 		client:    client,
-		requests:  make(chan *pb.StateRequest, 10),
-		responses: make(map[string]chan<- *pb.StateResponse),
+		requests:  make(chan *fnpb.StateRequest, 10),
+		responses: make(map[string]chan<- *fnpb.StateResponse),
 		cancelFn:  cancelFn,
 		DoneCh:    ctx.Done(),
 	}
@@ -346,7 +346,7 @@ func (c *StateChannel) write(ctx context.Context) {
 	var err error
 	var id string
 	for {
-		var req *pb.StateRequest
+		var req *fnpb.StateRequest
 		select {
 		case req = <-c.requests:
 		case <-c.DoneCh: // Close the goroutine on context cancel.
@@ -380,16 +380,16 @@ func (c *StateChannel) write(ctx context.Context) {
 	c.terminateStreamOnError(err)
 
 	if ok {
-		ch <- &pb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
+		ch <- &fnpb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
 	}
 }
 
 // Send sends a state request and returns the response.
-func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
+func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error) {
 	id := fmt.Sprintf("r%v", atomic.AddInt32(&c.nextRequestNo, 1))
 	req.Id = id
 
-	ch := make(chan *pb.StateResponse, 1)
+	ch := make(chan *fnpb.StateResponse, 1)
 	c.mu.Lock()
 	if c.closedErr != nil {
 		defer c.mu.Unlock()
@@ -400,7 +400,7 @@ func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
 
 	c.requests <- req
 
-	var resp *pb.StateResponse
+	var resp *fnpb.StateResponse
 	select {
 	case resp = <-ch:
 	case <-c.DoneCh: