You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/08 04:59:41 UTC

[beam] branch lostluck-protosuffix created (now 3f92f32)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 3f92f32  Update session.go

This branch includes the following new commits:

     new 3f92f32  Update session.go

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Update session.go

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3f92f32e88686a6e8e9b78fc0a8509322d2c4fab
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Apr 7 21:59:24 2020 -0700

    Update session.go
    
    Make controlServer actually implement the control server interface.
    Move the import name to match shorter convention.
---
 sdks/go/pkg/beam/runners/session/session.go | 30 ++++++++++++++++-------------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go
index d814882..f917bc0 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -34,8 +34,8 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
-	fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-	rapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
 	"google.golang.org/grpc"
 )
@@ -56,13 +56,13 @@ var sessionFile = flag.String("session_file", "", "Session file for the runner")
 type controlServer struct {
 	filename   string
 	wg         *sync.WaitGroup // used to signal when the session is completed
-	ctrlStream fnapi_pb.BeamFnControl_ControlServer
+	ctrlStream fnpb.BeamFnControl_ControlServer
 	dataServer *grpc.Server
-	dataStream fnapi_pb.BeamFnData_DataServer
+	dataStream fnapb.BeamFnData_DataServer
 	dwg        *sync.WaitGroup
 }
 
-func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) error {
+func (c *controlServer) Control(stream fnpb.BeamFnControl_ControlServer) error {
 	fmt.Println("Go SDK connected")
 	c.ctrlStream = stream
 	// We have a connected worker. Start reading the session file and issuing
@@ -74,6 +74,10 @@ func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) err
 	return nil
 }
 
+func (c *controlServer) GetProcessBundleDescriptor(ctx context.Context, r *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	return nil, nil
+}
+
 func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	if c.dataServer != nil {
 		// Already a data server, we're done
@@ -87,7 +91,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	// the data server is listening on.
 
 	c.dataServer = grpc.NewServer()
-	fnapi_pb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
+	fnpb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
 	dp, err := net.Listen("tcp", tcpPort)
 	if err != nil {
 		panic(err)
@@ -97,7 +101,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
 	go c.dataServer.Serve(dp)
 }
 
-func (c *controlServer) registerStream(stream fnapi_pb.BeamFnData_DataServer) {
+func (c *controlServer) registerStream(stream fnpb.BeamFnData_DataServer) {
 	c.dataStream = stream
 	c.dwg.Done()
 }
@@ -194,8 +198,8 @@ func (c *controlServer) handleEntry(msg *session.Entry) {
 	}
 }
 
-func extractPortSpec(spec *rapi_pb.FunctionSpec) string {
-	var port fnapi_pb.RemoteGrpcPort
+func extractPortSpec(spec *pipepb.FunctionSpec) string {
+	var port fnpb.RemoteGrpcPort
 	if err := proto.Unmarshal(spec.GetPayload(), &port); err != nil {
 		panic(err)
 	}
@@ -213,7 +217,7 @@ type dataServer struct {
 	ctrl *controlServer
 }
 
-func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error {
+func (d *dataServer) Data(stream fnpb.BeamFnData_DataServer) error {
 	// This goroutine is only used for reading data. The stream object
 	// is passed to the control server so that all data is sent from
 	// a single goroutine to ensure proper ordering.
@@ -239,7 +243,7 @@ func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error {
 // loggingServer manages the FnAPI logging channel.
 type loggingServer struct{} // no data content
 
-func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) error {
+func (l *loggingServer) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
 	// This stream object is only used here. The stream is used for receiving, and
 	// no sends happen on it.
 	for {
@@ -269,7 +273,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	// Start up the grpc logging service.
 	ls := grpc.NewServer()
-	fnapi_pb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
+	fnpb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
 	logPort, err := net.Listen("tcp", ":0")
 	if err != nil {
 		panic("No logging port")
@@ -282,7 +286,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	wg.Add(1)
 
 	cs := grpc.NewServer()
-	fnapi_pb.RegisterBeamFnControlServer(cs, &controlServer{
+	fnpb.RegisterBeamFnControlServer(cs, &controlServer{
 		filename: *sessionFile,
 		wg:       &wg,
 	})