You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/04 19:56:10 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #16957: [BEAM-13829] Expose status API from Go SDK Harness

lostluck commented on code in PR #16957:
URL: https://github.com/apache/beam/pull/16957#discussion_r842077050


##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -98,6 +111,18 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 		log.Debugf(ctx, "control response channel closed")
 	}()
 
+	// if the runner supports worker status api then expose SDK harness status
+	if statusEndpoint != "" {
+		statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint)
+		if err != nil {
+			log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
+		} else {
+			statusHandler.wg.Add(1)

Review Comment:
   Consider calling this in the `start` method instead. It should go right before the `go w.reader(...)` call. It can't be called in the `reader` method because it's being run on a different goroutine, and there's no guarantee that it will run ASAP (though that is true in practice). Calling it before, ensures that there shouldn't be a race condition on something to wait on, before it's time to stop.
   
   (Unless I'm missing something of course, at which point we should have a comment here explaining why we need to do the wg.Add here)



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+type BeamFnWorkerStatusServicer struct {
+	fnpb.UnimplementedBeamFnWorkerStatusServer
+	response chan string
+	lis      *bufconn.Listener
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+	b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+	resp, err := b.Recv()
+	if err != nil {
+		return fmt.Errorf("error receiving response b.recv: %v", err)
+	}
+	w.response <- resp.GetStatusInfo()
+	return nil
+}
+
+const buffsize = 1024 * 1024
+
+var lis *bufconn.Listener
+
+func setup(t *testing.T, srv *BeamFnWorkerStatusServicer) {
+	server := grpc.NewServer()
+	lis = bufconn.Listen(buffsize)
+	fnpb.RegisterBeamFnWorkerStatusServer(server, srv)
+	go func() {
+		if err := server.Serve(lis); err != nil {
+			log.Fatalf("failed to serve: %v", err)
+		}
+	}()
+	t.Cleanup(func() {
+		server.Stop()
+	})
+}
+
+func dialer(context.Context, string) (net.Conn, error) {
+	return lis.Dial()
+}
+
+func TestSendStatusResponse(t *testing.T) {
+	ctx := context.Background()
+	srv := &BeamFnWorkerStatusServicer{response: make(chan string)}
+	setup(t, srv)
+	conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(dialer), grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("unable to start test server: %v", err)
+	}
+	statusHandler := workerStatusHandler{conn: conn}
+	statusHandler.wg.Add(1)

Review Comment:
   Don't forget to remove this line once it's been added to start.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status.go:
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"io"
+	"runtime"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+)
+
+// workerStatusHandler stores the communication information of WorkerStatus API.
+type workerStatusHandler struct {
+	conn           *grpc.ClientConn
+	shouldShutdown int32
+	wg             sync.WaitGroup
+}
+
+func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) {
+	sconn, err := dial(ctx, endpoint, 60*time.Second)
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint)
+	}
+	return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil
+}
+
+func (w *workerStatusHandler) isAlive() bool {
+	return atomic.LoadInt32(&w.shouldShutdown) == 0
+}
+
+func (w *workerStatusHandler) shutdown() {
+	atomic.StoreInt32(&w.shouldShutdown, 1)
+}
+
+// start starts the reader to accept WorkerStatusRequest and send WorkerStatusResponse with WorkerStatus API.
+func (w *workerStatusHandler) start(ctx context.Context) {
+	statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
+	stub, err := statusClient.WorkerStatus(ctx)
+	if err != nil {
+		log.Errorf(ctx, "status client not established: %v", err)
+		return
+	}
+	go w.reader(ctx, stub)
+}
+
+// reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
+// a response channel.
+func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
+	defer w.wg.Done()
+	buf := make([]byte, 1<<16)
+	for w.isAlive() {
+		req, err := stub.Recv()
+		if err != nil {
+			log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
+			return
+		}
+		log.Debugf(ctx, "RECV-status: %v", req.GetId())
+		runtime.Stack(buf, true)
+		response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)}
+		if err := stub.Send(response); err != nil && err != io.EOF {
+			log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err)
+		}
+	}
+}
+
+// stop stops the reader and closes worker status endpoint connection with the runner.
+func (w *workerStatusHandler) stop(ctx context.Context) {

Review Comment:
   Technically, since we're only calling this in the tests (which is fine, it's good to have), we should have it return the error as well as log it (just in case).
   
   This way the test can simply verify that the returned error is nil, and it'll be properly covered.



##########
sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+type BeamFnWorkerStatusServicer struct {
+	fnpb.UnimplementedBeamFnWorkerStatusServer
+	response chan string
+	lis      *bufconn.Listener
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+	b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+	resp, err := b.Recv()
+	if err != nil {
+		return fmt.Errorf("error receiving response b.recv: %v", err)
+	}
+	w.response <- resp.GetStatusInfo()
+	return nil
+}
+
+const buffsize = 1024 * 1024
+
+var lis *bufconn.Listener
+
+func setup(t *testing.T, srv *BeamFnWorkerStatusServicer) {
+	server := grpc.NewServer()
+	lis = bufconn.Listen(buffsize)
+	fnpb.RegisterBeamFnWorkerStatusServer(server, srv)
+	go func() {
+		if err := server.Serve(lis); err != nil {
+			log.Fatalf("failed to serve: %v", err)
+		}
+	}()
+	t.Cleanup(func() {
+		server.Stop()
+	})
+}
+
+func dialer(context.Context, string) (net.Conn, error) {
+	return lis.Dial()
+}
+
+func TestSendStatusResponse(t *testing.T) {
+	ctx := context.Background()
+	srv := &BeamFnWorkerStatusServicer{response: make(chan string)}
+	setup(t, srv)
+	conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(dialer), grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("unable to start test server: %v", err)
+	}
+	statusHandler := workerStatusHandler{conn: conn}
+	statusHandler.wg.Add(1)
+	statusHandler.start(ctx)
+	t.Cleanup(func() {
+		statusHandler.stop(ctx)

Review Comment:
   I thought I mentioned this in my previous review, but I apparently didn't.
   
   We shouldn't be calling the stop here in Cleanup. We need to have a clearer signal that the stop actually happens.
   
   Call it after trying the response, and then verifying that the `shouldShutdown` is set to 1.
   
   Effectively, because Cleanup isn't being called until *after* the test is finished, it's not considered part of the test. So, we need to call it inline with the test, and then verify it's behavior (technically by having the test not timeout, because otherwise the WG will block forever.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org