You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/02/19 19:32:50 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #25478: [#24789][prism] internal/worker + tentative data

lostluck commented on code in PR #25478:
URL: https://github.com/apache/beam/pull/25478#discussion_r1111307233


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}

Review Comment:
   Partly due to an earlier iteration on the design.
   
   Generally, it's a [Go Proverb](https://go-proverbs.github.io/) it's ideal to [Make the Zero Value useful](https://www.youtube.com/watch?v=PAAkCSZUG1c&t=385s), especially when others are expecting to use the type. It simplifies initialization of the type, in particularly in larger data structures.
   
   It's less critical when it's scope is only in a single package or project, but it's certainly a goal to strive towards.
   
   If something may not be used, then the most optimal thing is to never create it. It never puts pressure on the heap, and never needs to be garbage collected. In TentativeData, a bundle may not have any output at all, and this is unknowable at construction time. So it's cheap enough to check, and construct it on demand, then carry on identically.
   
   On the otherhand, if something is going to be used 100% of the time, it's usually better to provide a New function or an Init method to ensure everything is ready for downstream use. Deciding between those is usually dependant on how many internal fields exist.  eg. The B type is largely manipulated by it's client package via it's fields at the moment, and creating a NewB function for it, while providing compile time safety in what must be provided for initialization, isn't clearly the best approach yet. 
   
   Being able to revisit these choices is possible because the only consumers of these packages are part of the prism runner, so we don't risk breaking unknown users via these changes, as long as the runner works.
   
   As you can see, the right approach is situational. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org