You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/18 00:29:54 UTC

[GitHub] [beam] youngoli commented on a change in pull request #16957: [BEAM-13829] Expose status API from Go SDK Harness

youngoli commented on a change in pull request #16957:
URL: https://github.com/apache/beam/pull/16957#discussion_r829487283



##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -36,14 +36,27 @@ import (
 	"google.golang.org/grpc"
 )
 
+// StatusAddress is a type of status endpoint address as an optional argument to harness.Main().
+type StatusAddress string
+
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
 // "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.
-func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
+func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error {

Review comment:
       This is changing the signature of an exported function, which is something we usually want to avoid... But on the other hand the harness package is very unlikely to be used by end-users and is meant to just be used within Beam, so I think this should be fine. But just to be sure, I think we should run this by @lostluck.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status.go
##########
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       Nit: You could probably shorten the filename to status.go (and status_test.go). Not something I'll block on though.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -98,6 +111,18 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 		log.Debugf(ctx, "control response channel closed")
 	}()
 
+	// if the runner supports worker status api then expose SDK harness status
+	if statusEndpoint != "" {
+		statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint)
+		if err != nil {
+			log.Error(ctx, err)
+		}
+		var wg sync.WaitGroup

Review comment:
       Just checking, but is this meant to be a different WaitGroup than the one declared outside this scope? If so I think it would be clearer to rename this (even something like "wg2" would be okay, or "swg" for status wait group). It just makes it very explicit that this is supposed to be a different variable than the one above.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"sync"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+	fnpb.UnimplementedBeamFnWorkerStatusServer
+	response chan string
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+	b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+	resp, err := b.Recv()
+	if err != nil {
+		return fmt.Errorf("error receiving response b.recv: %v", err)
+	}
+	w.response <- resp.GetStatusInfo()
+	return nil
+}
+
+func setUp(port string, srv *BeamFnWorkerStatusServicer) {
+	l, err := net.Listen("tcp", ":9000")

Review comment:
       Can you use 0 as a port to have it listen on an available random port? As-is this could be flaky due to port conflicts.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"sync"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {
+	fnpb.UnimplementedBeamFnWorkerStatusServer
+	response chan string
+}
+
+func (w *BeamFnWorkerStatusServicer) WorkerStatus(b fnpb.BeamFnWorkerStatus_WorkerStatusServer) error {
+	b.Send(&fnpb.WorkerStatusRequest{Id: "1"})
+	resp, err := b.Recv()
+	if err != nil {
+		return fmt.Errorf("error receiving response b.recv: %v", err)
+	}
+	w.response <- resp.GetStatusInfo()
+	return nil
+}
+
+func setUp(port string, srv *BeamFnWorkerStatusServicer) {

Review comment:
       This function isn't actually using the port passed into it, that looks like an oversight.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package harness
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"net"
+	"sync"
+	"testing"
+
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+)
+
+type BeamFnWorkerStatusServicer struct {

Review comment:
       This is less of a comment for you and more of a question for @lostluck, but I've noticed a few unit tests now that start up actual GRPC servers to test our grpc files. Should we maybe transition to using a local package like buffcon, like in this Stack Overflow question: https://stackoverflow.com/questions/42102496/testing-a-grpc-service ?
   
   To be clear, even if the answer is yes we can save it for a subsequent PR. This isn't something to address in this PR either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org