You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/02/15 05:34:04 UTC

[GitHub] [beam] lostluck opened a new pull request, #25478: [#24789][prism] internal/worker + tentative data

lostluck opened a new pull request, #25478:
URL: https://github.com/apache/beam/pull/25478

   Add the GRPC FnAPI implementation that sits between SDK harnesses and the runner on workers.
   
   For Prism's initial implementation.
   
   See #24789.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1435724546

   R: @johannaojeling 
   
   Would you mind helping me usher in the Prism runner before the 2.46 cut?
   
   I'd love for it to be in a usable state in that release, and to get it so others can possible help complete it sooner.
   
   In particular for reviews at this stage, it would be great to get doc/ code ordering changes so it's not entirely confusing to readers. New functionality/bug fixes will become TODOs though, rather than completing them, since they may have strong follow on effects to the branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1435724745

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #25478:
URL: https://github.com/apache/beam/pull/25478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johannaojeling commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "johannaojeling (via GitHub)" <gi...@apache.org>.
johannaojeling commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1435769487

   Of course, I'll have a look at this tomorrow! (UTC+1)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johannaojeling commented on a diff in pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "johannaojeling (via GitHub)" <gi...@apache.org>.
johannaojeling commented on code in PR #25478:
URL: https://github.com/apache/beam/pull/25478#discussion_r1111310249


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}

Review Comment:
   Ok, understood. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johannaojeling commented on a diff in pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "johannaojeling (via GitHub)" <gi...@apache.org>.
johannaojeling commented on code in PR #25478:
URL: https://github.com/apache/beam/pull/25478#discussion_r1111262544


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"

Review Comment:
   ```suggestion
   	"bytes"
   	"context"
   	"fmt"
   	"io"
   	"net"
   	"sync"
   	"sync/atomic"
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //

Review Comment:
   ```suggestion
   		return slog.Level(-8)
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()

Review Comment:
   Perhaps rename `ikey` -> `iKey` and `mmkey` -> `mmKey` for consistency with the other variables?



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
+	w := New("test")
+
+	id := "available"
+	w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
+		Id: id,
+	}
+
+	pbd, err := w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: id,
+	})
+	if err != nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) error: %v, want nil", id, err)
+	}
+	if got, want := pbd.GetId(), id; got != want {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want id %v", id, got, want)
+	}
+
+	pbd, err = w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: "unknown",
+	})
+	if err == nil {
+		t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
+	}
+}
+
+func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) {
+	t.Helper()
+	ctx, cancelFn := context.WithCancel(context.Background())
+	t.Cleanup(cancelFn)
+
+	w := New("test")
+	lis := bufconn.Listen(2048)
+	w.lis = lis
+	t.Cleanup(func() { w.Stop() })
+	go w.Serve()
+
+	clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+		return lis.DialContext(ctx)
+	}), grpc.WithInsecure(), grpc.WithBlock())
+	if err != nil {
+		t.Fatal("couldn't create bufconn grpc connection:", err)
+	}
+	return ctx, w, clientConn
+}
+
+func TestWorker_Logging(t *testing.T) {
+	ctx, _, clientConn := serveTestWorker(t)
+
+	logCli := fnpb.NewBeamFnLoggingClient(clientConn)
+	logStream, err := logCli.Logging(ctx)
+	if err != nil {
+		t.Fatal("couldn't create log client:", err)
+	}
+
+	logStream.Send(&fnpb.LogEntry_List{
+		LogEntries: []*fnpb.LogEntry{{
+			Severity: fnpb.LogEntry_Severity_INFO,
+			Message:  "squeamish ossiphrage",
+		}},
+	})
+
+	// TODO: Connect to the job management service.
+	// At this point job messages are just logged to whereever the prism runner executes
+	// But this should pivot to anyone connecting to the Job Management service for the
+	// job.
+	// In the meantime, sleep to validate execution via coverage.
+	time.Sleep(20 * time.Millisecond)
+}
+
+func TestWorker_Control_HappyPath(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	ctrlCli := fnpb.NewBeamFnControlClient(clientConn)
+	ctrlStream, err := ctrlCli.Control(ctx)
+	if err != nil {
+		t.Fatal("couldn't create control client:", err)
+	}
+
+	instID := wk.NextInst()
+
+	b := &B{}
+	b.Init()
+	wk.bundles[instID] = b
+	b.ProcessOn(wk)
+
+	ctrlStream.Send(&fnpb.InstructionResponse{
+		InstructionId: instID,
+		Response: &fnpb.InstructionResponse_ProcessBundle{
+			ProcessBundle: &fnpb.ProcessBundleResponse{
+				RequiresFinalization: true, // Simple thing to check.
+			},
+		},
+	})
+
+	if err := ctrlStream.CloseSend(); err != nil {
+		t.Errorf("ctrlStream.CloseSend() = %v", err)
+	}
+	resp := <-b.Resp
+
+	if !resp.RequiresFinalization {
+		t.Errorf("got %v, want response that Requires Finalization", resp)
+	}
+}
+
+func TestWorker_Data_HappyPath(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	dataCli := fnpb.NewBeamFnDataClient(clientConn)
+	dataStream, err := dataCli.Data(ctx)
+	if err != nil {
+		t.Fatal("couldn't create data client:", err)
+	}
+
+	instID := wk.NextInst()
+
+	b := &B{
+		InstID: instID,
+		PBDID:  wk.NextStage(),
+		InputData: [][]byte{
+			{1, 1, 1, 1, 1, 1},
+		},
+		OutputCount: 1,
+	}
+	b.Init()
+	wk.bundles[instID] = b
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		b.ProcessOn(wk)
+	}()
+
+	wk.InstReqs <- &fnpb.InstructionRequest{
+		InstructionId: instID,
+	}
+
+	elements, err := dataStream.Recv()
+	if err != nil {
+		t.Fatal("couldn't receive data elements:", err)
+	}
+
+	if got, want := elements.GetData()[0].GetInstructionId(), b.InstID; got != want {
+		t.Fatalf("couldn't receive data elements ID: got %v, want %v", got, want)
+	}
+	if got, want := elements.GetData()[0].GetData(), []byte{1, 1, 1, 1, 1, 1}; !bytes.Equal(got, want) {
+		t.Fatalf("client Data received %v, want %v", got, want)
+	}
+	if got, want := elements.GetData()[0].GetIsLast(), true; got != want {
+		t.Fatalf("client Data received wasn't last: got %v, want %v", got, want)
+	}
+
+	dataStream.Send(elements)
+
+	if err := dataStream.CloseSend(); err != nil {
+		t.Errorf("ctrlStream.CloseSend() = %v", err)
+	}
+
+	wg.Wait()
+	t.Log("ProcessOn successfully exited")
+}
+
+func TestWorker_State_Iterable(t *testing.T) {
+	ctx, wk, clientConn := serveTestWorker(t)
+
+	stateCli := fnpb.NewBeamFnStateClient(clientConn)
+	stateStream, err := stateCli.State(ctx)
+	if err != nil {
+		t.Fatal("couldn't create state client:", err)
+	}
+
+	instID := wk.NextInst()
+	wk.bundles[instID] = &B{
+		IterableSideInputData: map[string]map[string]map[typex.Window][][]byte{
+			"transformID": {
+				"i1": {
+					window.GlobalWindow{}: [][]byte{
+						{42},
+					},
+				},
+			},
+		},
+	}
+
+	stateStream.Send(&fnpb.StateRequest{
+		Id:            "first",
+		InstructionId: instID,
+		Request: &fnpb.StateRequest_Get{
+			Get: &fnpb.StateGetRequest{},
+		},
+		StateKey: &fnpb.StateKey{Type: &fnpb.StateKey_IterableSideInput_{
+			IterableSideInput: &fnpb.StateKey_IterableSideInput{
+				TransformId: "transformID",
+				SideInputId: "i1",
+				Window:      []byte{}, // Global Windows
+			},
+		}},
+	})
+
+	resp, err := stateStream.Recv()
+	if err != nil {
+		t.Fatal("couldn't receive state response:", err)
+	}
+
+	if got, want := resp.GetId(), "first"; got != want {
+		t.Fatalf("didn't receive expected state response: got %v, want %v", got, want)
+	}
+
+	if got, want := resp.GetGet().GetData(), []byte{42}; !bytes.Equal(got, want) {
+		t.Fatalf("didn't receive expected state response data: got %v, want %v", got, want)
+	}
+	resp.GetId()
+

Review Comment:
   ```suggestion
   
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {

Review Comment:
   ```suggestion
   func TestWorker_NextStage(t *testing.T) {
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
+	w := New("test")
+
+	id := "available"
+	w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
+		Id: id,
+	}
+
+	pbd, err := w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: id,
+	})
+	if err != nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) error: %v, want nil", id, err)
+	}
+	if got, want := pbd.GetId(), id; got != want {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want id %v", id, got, want)
+	}
+
+	pbd, err = w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: "unknown",
+	})
+	if err == nil {
+		t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
+	}
+}
+
+func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) {
+	t.Helper()
+	ctx, cancelFn := context.WithCancel(context.Background())
+	t.Cleanup(cancelFn)
+
+	w := New("test")
+	lis := bufconn.Listen(2048)
+	w.lis = lis
+	t.Cleanup(func() { w.Stop() })
+	go w.Serve()
+
+	clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+		return lis.DialContext(ctx)
+	}), grpc.WithInsecure(), grpc.WithBlock())
+	if err != nil {
+		t.Fatal("couldn't create bufconn grpc connection:", err)
+	}
+	return ctx, w, clientConn
+}
+
+func TestWorker_Logging(t *testing.T) {
+	ctx, _, clientConn := serveTestWorker(t)
+
+	logCli := fnpb.NewBeamFnLoggingClient(clientConn)
+	logStream, err := logCli.Logging(ctx)
+	if err != nil {
+		t.Fatal("couldn't create log client:", err)
+	}
+
+	logStream.Send(&fnpb.LogEntry_List{
+		LogEntries: []*fnpb.LogEntry{{
+			Severity: fnpb.LogEntry_Severity_INFO,
+			Message:  "squeamish ossiphrage",
+		}},
+	})
+
+	// TODO: Connect to the job management service.
+	// At this point job messages are just logged to whereever the prism runner executes

Review Comment:
   ```suggestion
   	// At this point job messages are just logged to wherever the prism runner executes
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}

Review Comment:
   Just curious to learn the motivation to initialize the map here (same for `engine.TentativeData`) vs in a `New` function as is done e.g. for the maps in `worker.W`



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)

Review Comment:
   ```suggestion
   		t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want)
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+

Review Comment:
   ```suggestion
   
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
+	w := New("test")
+
+	id := "available"
+	w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
+		Id: id,
+	}
+
+	pbd, err := w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: id,
+	})
+	if err != nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) error: %v, want nil", id, err)
+	}
+	if got, want := pbd.GetId(), id; got != want {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want id %v", id, got, want)
+	}
+
+	pbd, err = w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: "unknown",
+	})
+	if err == nil {
+		t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
+	}
+}
+
+func serveTestWorker(t *testing.T) (context.Context, *W, *grpc.ClientConn) {
+	t.Helper()
+	ctx, cancelFn := context.WithCancel(context.Background())
+	t.Cleanup(cancelFn)
+
+	w := New("test")
+	lis := bufconn.Listen(2048)
+	w.lis = lis
+	t.Cleanup(func() { w.Stop() })
+	go w.Serve()
+
+	clientConn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+		return lis.DialContext(ctx)
+	}), grpc.WithInsecure(), grpc.WithBlock())

Review Comment:
   (deprecated)
   
   ```suggestion
   	}), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}
+	}
+	for colID, data := range tent.Raw {
+		d.raw[colID] = append(d.raw[colID], data...)
+	}
+}
+
+// Hack for Side Inputs until watermarks are sorted out.

Review Comment:
   ```suggestion
   // GetAllData is a hack for Side Inputs until watermarks are sorted out.
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go:
##########
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/test/bufconn"
+)
+
+func TestWorker_New(t *testing.T) {
+	w := New("test")
+	if got, want := w.ID, "test"; got != want {
+		t.Errorf("New(%q) = %v, want %v", want, got, want)
+	}
+}
+
+func TestWorker_NextInst(t *testing.T) {
+	w := New("test")
+
+	instIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		instIDs[w.NextInst()] = struct{}{}
+	}
+	if got, want := len(instIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_NextBund(t *testing.T) {
+	w := New("test")
+
+	stageIDs := map[string]struct{}{}
+	for i := 0; i < 100; i++ {
+		stageIDs[w.NextStage()] = struct{}{}
+	}
+	if got, want := len(stageIDs), 100; got != want {
+		t.Errorf("calling w.NextInst() got %v unique ids, want %v", got, want)
+	}
+}
+
+func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
+	w := New("test")
+
+	id := "available"
+	w.Descriptors[id] = &fnpb.ProcessBundleDescriptor{
+		Id: id,
+	}
+
+	pbd, err := w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: id,
+	})
+	if err != nil {
+		t.Errorf("got GetProcessBundleDescriptor(%q) error: %v, want nil", id, err)
+	}
+	if got, want := pbd.GetId(), id; got != want {
+		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want id %v", id, got, want)
+	}
+
+	pbd, err = w.GetProcessBundleDescriptor(context.Background(), &fnpb.GetProcessBundleDescriptorRequest{
+		ProcessBundleDescriptorId: "unknown",
+	})
+	if err == nil {
+		t.Errorf(" GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)

Review Comment:
   ```suggestion
   		t.Errorf("got GetProcessBundleDescriptor(%q) = %v, want error", "unknown", pbd)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1436149770

   These were excellent! Thank you very much. 
   
   When there's nearly 8000 lines to get in (including documentation, tests and test functions), it really adds up, so splitting things up is hopefully going to make it easier to review.
   
   Unfortunately, we're now getting into the complicated stuff.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1435670699

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25478:
URL: https://github.com/apache/beam/pull/25478#discussion_r1111307233


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -0,0 +1,424 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package worker handles interactions with SDK side workers, representing
+// the worker services, communicating with those services, and SDK environments.
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"sync/atomic"
+
+	"io"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"golang.org/x/exp/slog"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/encoding/prototext"
+)
+
+// A W manages worker environments, sending them work
+// that they're able to execute, and manages the server
+// side handlers for FnAPI RPCs.
+type W struct {
+	fnpb.UnimplementedBeamFnControlServer
+	fnpb.UnimplementedBeamFnDataServer
+	fnpb.UnimplementedBeamFnStateServer
+	fnpb.UnimplementedBeamFnLoggingServer
+
+	ID string
+
+	// Server management
+	lis    net.Listener
+	server *grpc.Server
+
+	// These are the ID sources
+	inst, bund uint64
+
+	// descs map[string]*fnpb.ProcessBundleDescriptor
+
+	InstReqs chan *fnpb.InstructionRequest
+	DataReqs chan *fnpb.Elements
+
+	mu          sync.Mutex
+	bundles     map[string]*B                            // Bundles keyed by InstructionID
+	Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
+
+	D *DataService
+}
+
+// New starts the worker server components of FnAPI Execution.
+func New(id string) *W {
+	lis, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(fmt.Sprintf("failed to listen: %v", err))
+	}
+	var opts []grpc.ServerOption
+	wk := &W{
+		ID:     id,
+		lis:    lis,
+		server: grpc.NewServer(opts...),
+
+		InstReqs: make(chan *fnpb.InstructionRequest, 10),
+		DataReqs: make(chan *fnpb.Elements, 10),
+
+		bundles:     make(map[string]*B),
+		Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
+
+		D: &DataService{},
+	}
+	slog.Info("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
+	fnpb.RegisterBeamFnControlServer(wk.server, wk)
+	fnpb.RegisterBeamFnDataServer(wk.server, wk)
+	fnpb.RegisterBeamFnLoggingServer(wk.server, wk)
+	fnpb.RegisterBeamFnStateServer(wk.server, wk)
+	return wk
+}
+
+func (wk *W) Endpoint() string {
+	return wk.lis.Addr().String()
+}
+
+// Serve serves on the started listener. Blocks.
+func (wk *W) Serve() {
+	wk.server.Serve(wk.lis)
+}
+
+func (wk *W) String() string {
+	return "worker[" + wk.ID + "]"
+}
+
+func (wk *W) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.String("ID", wk.ID),
+		slog.String("endpoint", wk.Endpoint()),
+	)
+}
+
+// Stop the GRPC server.
+func (wk *W) Stop() {
+	slog.Debug("stopping", "worker", wk)
+	close(wk.InstReqs)
+	close(wk.DataReqs)
+	wk.server.Stop()
+	wk.lis.Close()
+	slog.Debug("stopped", "worker", wk)
+}
+
+func (wk *W) NextInst() string {
+	return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
+}
+
+func (wk *W) NextStage() string {
+	return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
+}
+
+// TODO set logging level.
+var minsev = fnpb.LogEntry_Severity_DEBUG
+
+// Logging relates SDK worker messages back to the job that spawned them.
+// Messages are received from the SDK,
+func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			slog.Error("logging.Recv", err, "worker", wk)
+			return err
+		}
+		for _, l := range in.GetLogEntries() {
+			if l.Severity >= minsev {
+				// TODO: Connect to the associated Job for this worker instead of
+				// logging locally for SDK side logging.
+				slog.Log(toSlogSev(l.GetSeverity()), l.GetMessage(),
+					slog.String(slog.SourceKey, l.GetLogLocation()),
+					slog.Time(slog.TimeKey, l.GetTimestamp().AsTime()),
+					"worker", wk,
+				)
+			}
+		}
+	}
+}
+
+func toSlogSev(sev fnpb.LogEntry_Severity_Enum) slog.Level {
+	switch sev {
+	case fnpb.LogEntry_Severity_TRACE:
+		return slog.Level(-8) //
+	case fnpb.LogEntry_Severity_DEBUG:
+		return slog.LevelDebug // -4
+	case fnpb.LogEntry_Severity_INFO:
+		return slog.LevelInfo // 0
+	case fnpb.LogEntry_Severity_NOTICE:
+		return slog.Level(2)
+	case fnpb.LogEntry_Severity_WARN:
+		return slog.LevelWarn // 4
+	case fnpb.LogEntry_Severity_ERROR:
+		return slog.LevelError // 8
+	case fnpb.LogEntry_Severity_CRITICAL:
+		return slog.Level(10)
+	}
+	return slog.LevelInfo
+}
+
+func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) {
+	desc, ok := wk.Descriptors[req.GetProcessBundleDescriptorId()]
+	if !ok {
+		return nil, fmt.Errorf("descriptor %v not found", req.GetProcessBundleDescriptorId())
+	}
+	return desc, nil
+}
+
+// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
+//
+// Requests come from the runner, and are sent to the client in the SDK.
+func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+	done := make(chan bool)
+	go func() {
+		for {
+			resp, err := ctrl.Recv()
+			if err == io.EOF {
+				slog.Debug("ctrl.Recv finished; marking done", "worker", wk)
+				done <- true // means stream is finished
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled: // Might ignore this all the time instead.
+					slog.Error("ctrl.Recv Canceled", err, "worker", wk)
+					done <- true // means stream is finished
+					return
+				default:
+					slog.Error("ctrl.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+
+			// TODO: Do more than assume these are ProcessBundleResponses.
+			wk.mu.Lock()
+			if b, ok := wk.bundles[resp.GetInstructionId()]; ok {
+				// TODO. Better pipeline error handling.
+				if resp.Error != "" {
+					slog.Log(slog.LevelError, "ctrl.Recv pipeline error", slog.ErrorKey, resp.GetError())
+					panic(resp.GetError())
+				}
+				b.Resp <- resp.GetProcessBundle()
+			} else {
+				slog.Debug("ctrl.Recv: %v", resp)
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.InstReqs {
+		ctrl.Send(req)
+	}
+	slog.Debug("ctrl.Send finished waiting on done")
+	<-done
+	slog.Debug("Control done")
+	return nil
+}
+
+// Data relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// Data is multiplexed on a single stream for all active bundles on a worker.
+func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
+	go func() {
+		for {
+			resp, err := data.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("data.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("data.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			wk.mu.Lock()
+			for _, d := range resp.GetData() {
+				b, ok := wk.bundles[d.GetInstructionId()]
+				if !ok {
+					slog.Info("data.Recv for unknown bundle", "response", resp)
+					continue
+				}
+				colID := b.SinkToPCollection[d.GetTransformId()]
+
+				// There might not be data, eg. for side inputs, so we need to reconcile this elsewhere for
+				// downstream side inputs.
+				if len(d.GetData()) > 0 {
+					b.OutputData.WriteData(colID, d.GetData())
+				}
+				if d.GetIsLast() {
+					b.dataWait.Done()
+				}
+			}
+			wk.mu.Unlock()
+		}
+	}()
+
+	for req := range wk.DataReqs {
+		if err := data.Send(req); err != nil {
+			slog.Log(slog.LevelDebug, "data.Send error", slog.ErrorKey, err)
+		}
+	}
+	return nil
+}
+
+// State relays elements and timer bytes to SDKs and back again, coordinated via
+// ProcessBundle instructionIDs, and receiving input transforms.
+//
+// State requests come from SDKs, and the runner responds.
+func (wk *W) State(state fnpb.BeamFnState_StateServer) error {
+	responses := make(chan *fnpb.StateResponse)
+	go func() {
+		// This go routine creates all responses to state requests from the worker
+		// so we want to close the State handler when it's all done.
+		defer close(responses)
+		for {
+			req, err := state.Recv()
+			if err == io.EOF {
+				return
+			}
+			if err != nil {
+				switch status.Code(err) {
+				case codes.Canceled:
+					slog.Error("state.Recv Canceled", err, "worker", wk)
+					return
+				default:
+					slog.Error("state.Recv failed", err, "worker", wk)
+					panic(err)
+				}
+			}
+			switch req.GetRequest().(type) {
+			case *fnpb.StateRequest_Get:
+				// TODO: move data handling to be pcollection based.
+				b := wk.bundles[req.GetInstructionId()]
+				key := req.GetStateKey()
+				slog.Debug("StateRequest_Get", prototext.Format(req), "bundle", b)
+
+				var data [][]byte
+				switch key.GetType().(type) {
+				case *fnpb.StateKey_IterableSideInput_:
+					ikey := key.GetIterableSideInput()
+					wKey := ikey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					winMap := b.IterableSideInputData[ikey.GetTransformId()][ikey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] I Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+					data = winMap[w]
+
+				case *fnpb.StateKey_MultimapSideInput_:
+					mmkey := key.GetMultimapSideInput()
+					wKey := mmkey.GetWindow()
+					var w typex.Window
+					if len(wKey) == 0 {
+						w = window.GlobalWindow{}
+					} else {
+						w, err = exec.MakeWindowDecoder(coder.NewIntervalWindow()).DecodeSingle(bytes.NewBuffer(wKey))
+						if err != nil {
+							panic(fmt.Sprintf("error decoding iterable side input window key %v: %v", wKey, err))
+						}
+					}
+					dKey := mmkey.GetKey()
+					winMap := b.MultiMapSideInputData[mmkey.GetTransformId()][mmkey.GetSideInputId()]
+					var wins []typex.Window
+					for w := range winMap {
+						wins = append(wins, w)
+					}
+					slog.Debug(fmt.Sprintf("side input[%v][%v] MM Key: %v Windows: %v", req.GetId(), req.GetInstructionId(), w, wins))
+
+					data = winMap[w][string(dKey)]
+
+				default:
+					panic(fmt.Sprintf("unsupported StateKey Access type: %T: %v", key.GetType(), prototext.Format(key)))
+				}
+
+				// Encode the runner iterable (no length, just consecutive elements), and send it out.
+				// This is also where we can handle things like State Backed Iterables.
+				var buf bytes.Buffer
+				for _, value := range data {
+					buf.Write(value)
+				}
+				responses <- &fnpb.StateResponse{
+					Id: req.GetId(),
+					Response: &fnpb.StateResponse_Get{
+						Get: &fnpb.StateGetResponse{
+							Data: buf.Bytes(),
+						},
+					},
+				}
+			default:
+				panic(fmt.Sprintf("unsupported StateRequest kind %T: %v", req.GetRequest(), prototext.Format(req)))
+			}
+		}
+	}()
+	for resp := range responses {
+		if err := state.Send(resp); err != nil {
+			slog.Error("state.Send error", err)
+		}
+	}
+	return nil
+}
+
+// DataService is slated to be deleted in favour of stage based state
+// management for side inputs.
+type DataService struct {
+	// TODO actually quick process the data to windows here as well.
+	raw map[string][][]byte
+}
+
+// Commit tentative data to the datastore.
+func (d *DataService) Commit(tent engine.TentativeData) {
+	if d.raw == nil {
+		d.raw = map[string][][]byte{}

Review Comment:
   Partly due to an earlier iteration on the design.
   
   Generally, it's a [Go Proverb](https://go-proverbs.github.io/) it's ideal to [Make the Zero Value useful](https://www.youtube.com/watch?v=PAAkCSZUG1c&t=385s), especially when others are expecting to use the type. It simplifies initialization of the type, in particularly in larger data structures.
   
   It's less critical when it's scope is only in a single package or project, but it's certainly a goal to strive towards.
   
   If something may not be used, then the most optimal thing is to never create it. It never puts pressure on the heap, and never needs to be garbage collected. In TentativeData, a bundle may not have any output at all, and this is unknowable at construction time. So it's cheap enough to check, and construct it on demand, then carry on identically.
   
   On the otherhand, if something is going to be used 100% of the time, it's usually better to provide a New function or an Init method to ensure everything is ready for downstream use. Deciding between those is usually dependant on how many internal fields exist.  eg. The B type is largely manipulated by it's client package via it's fields at the moment, and creating a NewB function for it, while providing compile time safety in what must be provided for initialization, isn't clearly the best approach yet. 
   
   Being able to revisit these choices is possible because the only consumers of these packages are part of the prism runner, so we don't risk breaking unknown users via these changes, as long as the runner works.
   
   As you can see, the right approach is situational. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1435724476

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25478](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0c7676b) into [master](https://codecov.io/gh/apache/beam/commit/8283e4fc0dd6fd2912aea82a023dab7b39bb139a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8283e4f) will **decrease** coverage by `0.10%`.
   > The diff coverage is `66.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #25478      +/-   ##
   ==========================================
   - Coverage   72.93%   72.84%   -0.10%     
   ==========================================
     Files         748      755       +7     
     Lines       99366   100425    +1059     
   ==========================================
   + Hits        72476    73152     +676     
   - Misses      25523    25854     +331     
   - Partials     1367     1419      +52     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `52.25% <66.00%> (+0.39%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../go/pkg/beam/runners/prism/internal/engine/data.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2VuZ2luZS9kYXRhLmdv) | `0.00% <0.00%> (ø)` | |
   | [...o/pkg/beam/runners/prism/internal/worker/worker.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3dvcmtlci93b3JrZXIuZ28=) | `63.42% <63.42%> (ø)` | |
   | [...o/pkg/beam/runners/prism/internal/worker/bundle.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL3dvcmtlci9idW5kbGUuZ28=) | `90.24% <90.24%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/sdf.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9zZGYuZ28=) | `66.12% <0.00%> (-4.89%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/metrics/dumper.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL21ldHJpY3MvZHVtcGVyLmdv) | `49.20% <0.00%> (-4.77%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/xlangx/expand.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUveGxhbmd4L2V4cGFuZC5nbw==) | `0.00% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/runners/prism/internal/coders.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2NvZGVycy5nbw==) | `91.39% <0.00%> (ø)` | |
   | [...beam/runners/prism/internal/jobservices/metrics.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2pvYnNlcnZpY2VzL21ldHJpY3MuZ28=) | `70.83% <0.00%> (ø)` | |
   | [...o/pkg/beam/core/runtime/exec/sdf\_invokers\_arity.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9zZGZfaW52b2tlcnNfYXJpdHkuZ28=) | `31.91% <0.00%> (ø)` | |
   | [...pkg/beam/runners/prism/internal/engine/strategy.go](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9ydW5uZXJzL3ByaXNtL2ludGVybmFsL2VuZ2luZS9zdHJhdGVneS5nbw==) | `50.00% <0.00%> (ø)` | |
   | ... and [7 more](https://codecov.io/gh/apache/beam/pull/25478?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25478: [#24789][prism] internal/worker + tentative data

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25478:
URL: https://github.com/apache/beam/pull/25478#issuecomment-1430781359

   R: @jrmccluskey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org