You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 05:38:41 UTC
[beam] branch lostluck-protosuffix updated: Update statemgr.go
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push:
new e1a2730 Update statemgr.go
e1a2730 is described below
commit e1a2730a3adc573dec73f2bc6e0cbecc3db524c8
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:38:30 2020 -0700
Update statemgr.go
---
sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 46 +++++++++++------------
1 file changed, 23 insertions(+), 23 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 4c0e7f5..3ba9d27 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -26,7 +26,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/go/pkg/beam/log"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
)
@@ -104,7 +104,7 @@ func (s *ScopedStateReader) Close() error {
type stateKeyReader struct {
instID instructionID
- key *pb.StateKey
+ key *fnpb.StateKey
token []byte
buf []byte
@@ -116,9 +116,9 @@ type stateKeyReader struct {
}
func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string, instID instructionID, k, w []byte) *stateKeyReader {
- key := &pb.StateKey{
- Type: &pb.StateKey_MultimapSideInput_{
- MultimapSideInput: &pb.StateKey_MultimapSideInput{
+ key := &fnpb.StateKey{
+ Type: &fnpb.StateKey_MultimapSideInput_{
+ MultimapSideInput: &fnpb.StateKey_MultimapSideInput{
TransformId: id.PtransformID,
SideInputId: sideInputID,
Window: w,
@@ -134,9 +134,9 @@ func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string,
}
func newRunnerReader(ch *StateChannel, instID instructionID, k []byte) *stateKeyReader {
- key := &pb.StateKey{
- Type: &pb.StateKey_Runner_{
- Runner: &pb.StateKey_Runner{
+ key := &fnpb.StateKey{
+ Type: &fnpb.StateKey_Runner_{
+ Runner: &fnpb.StateKey_Runner{
Key: k,
},
},
@@ -164,12 +164,12 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) {
localChannel := r.ch
r.mu.Unlock()
- req := &pb.StateRequest{
+ req := &fnpb.StateRequest{
// Id: set by StateChannel
InstructionId: string(r.instID),
StateKey: r.key,
- Request: &pb.StateRequest_Get{
- Get: &pb.StateGetRequest{
+ Request: &fnpb.StateRequest_Get{
+ Get: &fnpb.StateGetRequest{
ContinuationToken: r.token,
},
},
@@ -243,8 +243,8 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
}
type stateClient interface {
- Send(*pb.StateRequest) error
- Recv() (*pb.StateResponse, error)
+ Send(*fnpb.StateRequest) error
+ Recv() (*fnpb.StateResponse, error)
}
// StateChannel manages state transactions over a single gRPC connection.
@@ -254,10 +254,10 @@ type StateChannel struct {
id string
client stateClient
- requests chan *pb.StateRequest
+ requests chan *fnpb.StateRequest
nextRequestNo int32
- responses map[string]chan<- *pb.StateResponse
+ responses map[string]chan<- *fnpb.StateResponse
mu sync.Mutex
// a closure that forces the state manager to recreate this stream.
@@ -285,7 +285,7 @@ func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, error)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to state service %v", port.URL)
}
- client, err := pb.NewBeamFnStateClient(cc).State(ctx)
+ client, err := fnpb.NewBeamFnStateClient(cc).State(ctx)
if err != nil {
cc.Close()
return nil, errors.Wrapf(err, "failed to create state client %v", port.URL)
@@ -297,8 +297,8 @@ func makeStateChannel(ctx context.Context, cancelFn context.CancelFunc, id strin
ret := &StateChannel{
id: id,
client: client,
- requests: make(chan *pb.StateRequest, 10),
- responses: make(map[string]chan<- *pb.StateResponse),
+ requests: make(chan *fnpb.StateRequest, 10),
+ responses: make(map[string]chan<- *fnpb.StateResponse),
cancelFn: cancelFn,
DoneCh: ctx.Done(),
}
@@ -346,7 +346,7 @@ func (c *StateChannel) write(ctx context.Context) {
var err error
var id string
for {
- var req *pb.StateRequest
+ var req *fnpb.StateRequest
select {
case req = <-c.requests:
case <-c.DoneCh: // Close the goroutine on context cancel.
@@ -380,16 +380,16 @@ func (c *StateChannel) write(ctx context.Context) {
c.terminateStreamOnError(err)
if ok {
- ch <- &pb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
+ ch <- &fnpb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
}
}
// Send sends a state request and returns the response.
-func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
+func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error) {
id := fmt.Sprintf("r%v", atomic.AddInt32(&c.nextRequestNo, 1))
req.Id = id
- ch := make(chan *pb.StateResponse, 1)
+ ch := make(chan *fnpb.StateResponse, 1)
c.mu.Lock()
if c.closedErr != nil {
defer c.mu.Unlock()
@@ -400,7 +400,7 @@ func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
c.requests <- req
- var resp *pb.StateResponse
+ var resp *fnpb.StateResponse
select {
case resp = <-ch:
case <-c.DoneCh: