You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 04:59:42 UTC

[beam] 01/01: Update session.go

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3f92f32e88686a6e8e9b78fc0a8509322d2c4fab
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 21:59:24 2020 -0700

    Update session.go
    
    Make controlServer actually implement the control server interface.
    Move the import name to match shorter convention.
---
 sdks/go/pkg/beam/runners/session/session.go | 30 ++++++++++++++++-------------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go
index d814882..f917bc0 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -34,8 +34,8 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
-	fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-	rapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
 )
@@ -56,13 +56,13 @@ var sessionFile = flag.String("session_file", "", "Session file for the runner")
 type controlServer struct {
 	filename   string
 	wg         *sync.WaitGroup // used to signal when the session is completed
-	ctrlStream fnapi_pb.BeamFnControl_ControlServer
+	ctrlStream fnpb.BeamFnControl_ControlServer
 	dataServer *grpc.Server
-	dataStream fnapi_pb.BeamFnData_DataServer
+	dataStream fnapb.BeamFnData_DataServer
 	dwg        *sync.WaitGroup
 }
 
-func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) error {
+func (c *controlServer) Control(stream fnpb.BeamFnControl_ControlServer) error {
 	fmt.Println("Go SDK connected")
 	c.ctrlStream = stream
 	// We have a connected worker. Start reading the session file and issuing
@@ -74,6 +74,10 @@ func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) err
 	return nil
 }
 
+func (c *controlServer) GetProcessBundleDescriptor(ctx context.Context, r *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	return nil, nil
+}
+
 func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	if c.dataServer != nil {
 		// Already a data server, we're done
@@ -87,7 +91,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	// the data server is listening on.
 
 	c.dataServer = grpc.NewServer()
-	fnapi_pb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
+	fnpb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
 	dp, err := net.Listen("tcp", tcpPort)
 	if err != nil {
 		panic(err)
@@ -97,7 +101,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	go c.dataServer.Serve(dp)
 }
 
-func (c *controlServer) registerStream(stream fnapi_pb.BeamFnData_DataServer) {
+func (c *controlServer) registerStream(stream fnpb.BeamFnData_DataServer) {
 	c.dataStream = stream
 	c.dwg.Done()
 }
@@ -194,8 +198,8 @@ func (c *controlServer) handleEntry(msg *session.Entry) {
 	}
 }
 
-func extractPortSpec(spec *rapi_pb.FunctionSpec) string {
-	var port fnapi_pb.RemoteGrpcPort
+func extractPortSpec(spec *pipepb.FunctionSpec) string {
+	var port fnpb.RemoteGrpcPort
 	if err := proto.Unmarshal(spec.GetPayload(), &port); err != nil {
 		panic(err)
 	}
@@ -213,7 +217,7 @@ type dataServer struct {
 	ctrl *controlServer
 }
 
-func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error {
+func (d *dataServer) Data(stream fnpb.BeamFnData_DataServer) error {
 	// This goroutine is only used for reading data. The stream object
 	// is passed to the control server so that all data is sent from
 	// a single goroutine to ensure proper ordering.
@@ -239,7 +243,7 @@ func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error {
 // loggingServer manages the FnAPI logging channel.
 type loggingServer struct{} // no data content
 
-func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) error {
+func (l *loggingServer) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
 	// This stream object is only used here. The stream is used for receiving, and
 	// no sends happen on it.
 	for {
@@ -269,7 +273,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	// Start up the grpc logging service.
 	ls := grpc.NewServer()
-	fnapi_pb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
+	fnpb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
 	logPort, err := net.Listen("tcp", ":0")
 	if err != nil {
 		panic("No logging port")
@@ -282,7 +286,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	wg.Add(1)
 
 	cs := grpc.NewServer()
-	fnapi_pb.RegisterBeamFnControlServer(cs, &controlServer{
+	fnpb.RegisterBeamFnControlServer(cs, &controlServer{
 		filename: *sessionFile,
 		wg:       &wg,
 	})