You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "johannaojeling (via GitHub)" <gi...@apache.org> on 2023/02/21 16:59:30 UTC

[GitHub] [beam] johannaojeling commented on a diff in pull request #25568: [#24789][prism] V0 Go Direct Runner Replacement, Prism.

johannaojeling commented on code in PR #25568:
URL: https://github.com/apache/beam/pull/25568#discussion_r1113306024


##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,

Review Comment:
   ```suggestion
   // so they can be processed by another bundle,
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can

Review Comment:
   ```suggestion
   // separate_test.go retains structures and tests to ensure the runner can
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+	initRunner(t)
+
+	ws.initRPCServer()
+
+	tests := []struct {
+		name     string
+		pipeline func(s beam.Scope)
+		metrics  func(t *testing.T, pr beam.PipelineResult)
+	}{
+		{
+			name: "ProcessContinuations_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &sepHarnessSdfStream{
+					Base: sepHarnessBase{
+						WatcherID:         ws.newWatcher(3),
+						Sleep:             time.Second,
+						IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+						LocalService:      ws.serviceAddress,
+					},
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global num ints", count)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &singleStepSdfStream{
+					Sleep:    time.Second,
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global stepped num ints", count)
+				sum := beam.ParDo(s, dofn2x1, imp, beam.SideInput{Input: out})
+				beam.ParDo(s, &int64Check{Name: "stepped", Want: []int{45}}, sum)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_fixedWindow",
+			pipeline: func(s beam.Scope) {
+				elms, mod := 1000, 10
+				count := int(elms / mod)
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &eventtimeSDFStream{
+					Sleep:    time.Second,
+					RestSize: int64(elms),
+					Mod:      int64(mod),
+					Fixed:    1,
+				}, imp)
+				windowed := beam.WindowInto(s, window.NewFixedWindows(time.Second*10), out)
+				sum := stats.Sum(s, windowed)
+				// We expect each window to be processed ASAP, and produced one
+				// at a time, with the same results.
+				beam.ParDo(s, &int64Check{Name: "single", Want: []int{55}}, sum)
+				// But we need to receive the expected number of identical results
+				gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum)
+				passert.Count(s, gsum, "total sums", count)
+			},
+		},
+	}
+
+	// TODO: Channel Splits
+	// TODO: SubElement/dynamic splits.
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p, s := beam.NewPipelineWithRoot()
+			test.pipeline(s)
+			pr, err := executeWithT(context.Background(), t, p)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if test.metrics != nil {
+				test.metrics(t, pr)
+			}
+		})
+	}
+}
+
+func init() {
+	register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+	return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+	id                         int
+	mu                         sync.Mutex
+	sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.Int("id", w.id),
+		slog.Int("sentinelCount", w.sentinelCount),
+		slog.Int("sentinelCap", w.sentinelCap),
+	)
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+	mu             sync.Mutex
+	nextID         int
+	lookup         map[int]*watcher
+	serviceOnce    sync.Once
+	serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methdos.
+type Args struct {
+	WatcherID int
+}
+
+// Block is called once per sentinel, to indicate it will block
+// until all sentinels are blocked.
+func (ws *Watchers) Block(args *Args, _ *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	w.mu.Unlock()
+	return nil
+}
+
+// Check returns whether the sentinels are unblocked or not.
+func (ws *Watchers) Check(args *Args, unblocked *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	*unblocked = w.sentinelCount >= w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked)
+	return nil
+}
+
+// Delay returns whether the sentinels shoudld delay.
+// This increments the sentinel cap, and returns unblocked.
+// Intended to validate ProcessContinuation behavior.
+func (ws *Watchers) Delay(args *Args, delay *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	// Delay as long as the sentinel count is under the cap.
+	*delay = w.sentinelCount < w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("Delay: sentinel target", "watcher", w, slog.Bool("delay", *delay))
+	return nil
+}
+
+func (ws *Watchers) initRPCServer() {
+	ws.serviceOnce.Do(func() {
+		l, err := net.Listen("tcp", ":0")
+		if err != nil {
+			panic(err)
+		}
+		rpc.Register(ws)
+		rpc.HandleHTTP()
+		go http.Serve(l, nil)
+		ws.serviceAddress = l.Addr().String()
+	})
+}
+
+// newWatcher starts an rpc server to maange state for watching for

Review Comment:
   ```suggestion
   // newWatcher starts an rpc server to manage state for watching for
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+	initRunner(t)
+
+	ws.initRPCServer()
+
+	tests := []struct {
+		name     string
+		pipeline func(s beam.Scope)
+		metrics  func(t *testing.T, pr beam.PipelineResult)
+	}{
+		{
+			name: "ProcessContinuations_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &sepHarnessSdfStream{
+					Base: sepHarnessBase{
+						WatcherID:         ws.newWatcher(3),
+						Sleep:             time.Second,
+						IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+						LocalService:      ws.serviceAddress,
+					},
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global num ints", count)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &singleStepSdfStream{
+					Sleep:    time.Second,
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global stepped num ints", count)
+				sum := beam.ParDo(s, dofn2x1, imp, beam.SideInput{Input: out})
+				beam.ParDo(s, &int64Check{Name: "stepped", Want: []int{45}}, sum)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_fixedWindow",
+			pipeline: func(s beam.Scope) {
+				elms, mod := 1000, 10
+				count := int(elms / mod)
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &eventtimeSDFStream{
+					Sleep:    time.Second,
+					RestSize: int64(elms),
+					Mod:      int64(mod),
+					Fixed:    1,
+				}, imp)
+				windowed := beam.WindowInto(s, window.NewFixedWindows(time.Second*10), out)
+				sum := stats.Sum(s, windowed)
+				// We expect each window to be processed ASAP, and produced one
+				// at a time, with the same results.
+				beam.ParDo(s, &int64Check{Name: "single", Want: []int{55}}, sum)
+				// But we need to receive the expected number of identical results
+				gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum)
+				passert.Count(s, gsum, "total sums", count)
+			},
+		},
+	}
+
+	// TODO: Channel Splits
+	// TODO: SubElement/dynamic splits.
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p, s := beam.NewPipelineWithRoot()
+			test.pipeline(s)
+			pr, err := executeWithT(context.Background(), t, p)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if test.metrics != nil {
+				test.metrics(t, pr)
+			}
+		})
+	}
+}
+
+func init() {
+	register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+	return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+	id                         int
+	mu                         sync.Mutex
+	sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.Int("id", w.id),
+		slog.Int("sentinelCount", w.sentinelCount),
+		slog.Int("sentinelCap", w.sentinelCap),
+	)
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+	mu             sync.Mutex
+	nextID         int
+	lookup         map[int]*watcher
+	serviceOnce    sync.Once
+	serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methdos.
+type Args struct {
+	WatcherID int
+}
+
+// Block is called once per sentinel, to indicate it will block
+// until all sentinels are blocked.
+func (ws *Watchers) Block(args *Args, _ *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	w.mu.Unlock()
+	return nil
+}
+
+// Check returns whether the sentinels are unblocked or not.
+func (ws *Watchers) Check(args *Args, unblocked *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	*unblocked = w.sentinelCount >= w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked)
+	return nil
+}
+
+// Delay returns whether the sentinels shoudld delay.
+// This increments the sentinel cap, and returns unblocked.
+// Intended to validate ProcessContinuation behavior.
+func (ws *Watchers) Delay(args *Args, delay *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	// Delay as long as the sentinel count is under the cap.
+	*delay = w.sentinelCount < w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("Delay: sentinel target", "watcher", w, slog.Bool("delay", *delay))
+	return nil
+}
+
+func (ws *Watchers) initRPCServer() {
+	ws.serviceOnce.Do(func() {
+		l, err := net.Listen("tcp", ":0")
+		if err != nil {
+			panic(err)
+		}
+		rpc.Register(ws)
+		rpc.HandleHTTP()
+		go http.Serve(l, nil)
+		ws.serviceAddress = l.Addr().String()
+	})
+}
+
+// newWatcher starts an rpc server to maange state for watching for
+// sentinels across local machines.
+func (ws *Watchers) newWatcher(sentinelCap int) int {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	ws.initRPCServer()
+	if ws.lookup == nil {
+		ws.lookup = map[int]*watcher{}
+	}
+	w := &watcher{id: ws.nextID, sentinelCap: sentinelCap}
+	ws.nextID++
+	ws.lookup[w.id] = w
+	return w.id
+}
+
+// sepHarnessBase contains fields and functions that are shared by all
+// versions of the separation harness.
+type sepHarnessBase struct {
+	WatcherID         int
+	Sleep             time.Duration
+	IsSentinelEncoded beam.EncodedFunc
+	LocalService      string
+}
+
+// One connection per binary.
+var (
+	sepClientOnce sync.Once
+	sepClient     *rpc.Client
+	sepClientMu   sync.Mutex
+	sepWaitMap    map[int]chan struct{}
+)
+
+func (fn *sepHarnessBase) setup() error {
+	sepClientMu.Lock()
+	defer sepClientMu.Unlock()
+	sepClientOnce.Do(func() {
+		client, err := rpc.DialHTTP("tcp", fn.LocalService)
+		if err != nil {
+			slog.Error("failed to dial sentinels  server", err, slog.String("endpoint", fn.LocalService))
+			panic(fmt.Sprintf("dialing sentinels server %v: %v", fn.LocalService, err))
+		}
+		sepClient = client
+		sepWaitMap = map[int]chan struct{}{}
+	})
+
+	// Check if there's alreaedy a local channel for this id, and if not

Review Comment:
   ```suggestion
   	// Check if there's already a local channel for this id, and if not
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+	initRunner(t)
+
+	ws.initRPCServer()
+
+	tests := []struct {
+		name     string
+		pipeline func(s beam.Scope)
+		metrics  func(t *testing.T, pr beam.PipelineResult)
+	}{
+		{
+			name: "ProcessContinuations_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &sepHarnessSdfStream{
+					Base: sepHarnessBase{
+						WatcherID:         ws.newWatcher(3),
+						Sleep:             time.Second,
+						IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+						LocalService:      ws.serviceAddress,
+					},
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global num ints", count)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &singleStepSdfStream{
+					Sleep:    time.Second,
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global stepped num ints", count)
+				sum := beam.ParDo(s, dofn2x1, imp, beam.SideInput{Input: out})
+				beam.ParDo(s, &int64Check{Name: "stepped", Want: []int{45}}, sum)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_fixedWindow",
+			pipeline: func(s beam.Scope) {
+				elms, mod := 1000, 10
+				count := int(elms / mod)
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &eventtimeSDFStream{
+					Sleep:    time.Second,
+					RestSize: int64(elms),
+					Mod:      int64(mod),
+					Fixed:    1,
+				}, imp)
+				windowed := beam.WindowInto(s, window.NewFixedWindows(time.Second*10), out)
+				sum := stats.Sum(s, windowed)
+				// We expect each window to be processed ASAP, and produced one
+				// at a time, with the same results.
+				beam.ParDo(s, &int64Check{Name: "single", Want: []int{55}}, sum)
+				// But we need to receive the expected number of identical results
+				gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum)
+				passert.Count(s, gsum, "total sums", count)
+			},
+		},
+	}
+
+	// TODO: Channel Splits
+	// TODO: SubElement/dynamic splits.
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p, s := beam.NewPipelineWithRoot()
+			test.pipeline(s)
+			pr, err := executeWithT(context.Background(), t, p)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if test.metrics != nil {
+				test.metrics(t, pr)
+			}
+		})
+	}
+}
+
+func init() {
+	register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+	return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+	id                         int
+	mu                         sync.Mutex
+	sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.Int("id", w.id),
+		slog.Int("sentinelCount", w.sentinelCount),
+		slog.Int("sentinelCap", w.sentinelCap),
+	)
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+	mu             sync.Mutex
+	nextID         int
+	lookup         map[int]*watcher
+	serviceOnce    sync.Once
+	serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methdos.
+type Args struct {
+	WatcherID int
+}
+
+// Block is called once per sentinel, to indicate it will block
+// until all sentinels are blocked.
+func (ws *Watchers) Block(args *Args, _ *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	w.mu.Unlock()
+	return nil
+}
+
+// Check returns whether the sentinels are unblocked or not.
+func (ws *Watchers) Check(args *Args, unblocked *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	*unblocked = w.sentinelCount >= w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked)
+	return nil
+}
+
+// Delay returns whether the sentinels shoudld delay.
+// This increments the sentinel cap, and returns unblocked.
+// Intended to validate ProcessContinuation behavior.
+func (ws *Watchers) Delay(args *Args, delay *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	// Delay as long as the sentinel count is under the cap.
+	*delay = w.sentinelCount < w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("Delay: sentinel target", "watcher", w, slog.Bool("delay", *delay))
+	return nil
+}
+
+func (ws *Watchers) initRPCServer() {
+	ws.serviceOnce.Do(func() {
+		l, err := net.Listen("tcp", ":0")
+		if err != nil {
+			panic(err)
+		}
+		rpc.Register(ws)
+		rpc.HandleHTTP()
+		go http.Serve(l, nil)
+		ws.serviceAddress = l.Addr().String()
+	})
+}
+
+// newWatcher starts an rpc server to maange state for watching for
+// sentinels across local machines.
+func (ws *Watchers) newWatcher(sentinelCap int) int {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	ws.initRPCServer()
+	if ws.lookup == nil {
+		ws.lookup = map[int]*watcher{}
+	}
+	w := &watcher{id: ws.nextID, sentinelCap: sentinelCap}
+	ws.nextID++
+	ws.lookup[w.id] = w
+	return w.id
+}
+
+// sepHarnessBase contains fields and functions that are shared by all
+// versions of the separation harness.
+type sepHarnessBase struct {
+	WatcherID         int
+	Sleep             time.Duration
+	IsSentinelEncoded beam.EncodedFunc
+	LocalService      string
+}
+
+// One connection per binary.
+var (
+	sepClientOnce sync.Once
+	sepClient     *rpc.Client
+	sepClientMu   sync.Mutex
+	sepWaitMap    map[int]chan struct{}
+)
+
+func (fn *sepHarnessBase) setup() error {
+	sepClientMu.Lock()
+	defer sepClientMu.Unlock()
+	sepClientOnce.Do(func() {
+		client, err := rpc.DialHTTP("tcp", fn.LocalService)
+		if err != nil {
+			slog.Error("failed to dial sentinels  server", err, slog.String("endpoint", fn.LocalService))
+			panic(fmt.Sprintf("dialing sentinels server %v: %v", fn.LocalService, err))
+		}
+		sepClient = client
+		sepWaitMap = map[int]chan struct{}{}
+	})
+
+	// Check if there's alreaedy a local channel for this id, and if not
+	// start a watcher goroutine to poll and unblock the harness when
+	// the expected number of ssentinels is reached.

Review Comment:
   ```suggestion
   	// the expected number of sentinels is reached.
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+	initRunner(t)
+
+	ws.initRPCServer()
+
+	tests := []struct {
+		name     string
+		pipeline func(s beam.Scope)
+		metrics  func(t *testing.T, pr beam.PipelineResult)
+	}{
+		{
+			name: "ProcessContinuations_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &sepHarnessSdfStream{
+					Base: sepHarnessBase{
+						WatcherID:         ws.newWatcher(3),
+						Sleep:             time.Second,
+						IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+						LocalService:      ws.serviceAddress,
+					},
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global num ints", count)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &singleStepSdfStream{
+					Sleep:    time.Second,
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global stepped num ints", count)
+				sum := beam.ParDo(s, dofn2x1, imp, beam.SideInput{Input: out})
+				beam.ParDo(s, &int64Check{Name: "stepped", Want: []int{45}}, sum)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_fixedWindow",
+			pipeline: func(s beam.Scope) {
+				elms, mod := 1000, 10
+				count := int(elms / mod)
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &eventtimeSDFStream{
+					Sleep:    time.Second,
+					RestSize: int64(elms),
+					Mod:      int64(mod),
+					Fixed:    1,
+				}, imp)
+				windowed := beam.WindowInto(s, window.NewFixedWindows(time.Second*10), out)
+				sum := stats.Sum(s, windowed)
+				// We expect each window to be processed ASAP, and produced one
+				// at a time, with the same results.
+				beam.ParDo(s, &int64Check{Name: "single", Want: []int{55}}, sum)
+				// But we need to receive the expected number of identical results
+				gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum)
+				passert.Count(s, gsum, "total sums", count)
+			},
+		},
+	}
+
+	// TODO: Channel Splits
+	// TODO: SubElement/dynamic splits.
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p, s := beam.NewPipelineWithRoot()
+			test.pipeline(s)
+			pr, err := executeWithT(context.Background(), t, p)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if test.metrics != nil {
+				test.metrics(t, pr)
+			}
+		})
+	}
+}
+
+func init() {
+	register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+	return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+	id                         int
+	mu                         sync.Mutex
+	sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.Int("id", w.id),
+		slog.Int("sentinelCount", w.sentinelCount),
+		slog.Int("sentinelCap", w.sentinelCap),
+	)
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+	mu             sync.Mutex
+	nextID         int
+	lookup         map[int]*watcher
+	serviceOnce    sync.Once
+	serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methdos.
+type Args struct {
+	WatcherID int
+}
+
+// Block is called once per sentinel, to indicate it will block
+// until all sentinels are blocked.
+func (ws *Watchers) Block(args *Args, _ *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	w.sentinelCount++
+	w.mu.Unlock()
+	return nil
+}
+
+// Check returns whether the sentinels are unblocked or not.
+func (ws *Watchers) Check(args *Args, unblocked *bool) error {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	w, ok := ws.lookup[args.WatcherID]
+	if !ok {
+		return fmt.Errorf("no watcher with id %v", args.WatcherID)
+	}
+	w.mu.Lock()
+	*unblocked = w.sentinelCount >= w.sentinelCap
+	w.mu.Unlock()
+	slog.Debug("sentinel target for watcher%d is %d/%d. unblocked=%v", args.WatcherID, w.sentinelCount, w.sentinelCap, *unblocked)
+	return nil
+}
+
+// Delay returns whether the sentinels shoudld delay.

Review Comment:
   ```suggestion
   // Delay returns whether the sentinels should delay.
   ```



##########
sdks/go/pkg/beam/runners/prism/internal/separate_test.go:
##########
@@ -0,0 +1,593 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"net/http"
+	"net/rpc"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
+	"golang.org/x/exp/slog"
+)
+
+// separate_test.go is retains structures and tests to ensure the runner can
+// perform separation, and terminate checkpoints.
+
+// Global variable, so only one is registered with the OS.
+var ws = &Watchers{}
+
+// TestSeparation validates that the runner is able to split
+// elements in time and space. Beam has a few mechanisms to
+// do this.
+//
+// First is channel splits, where a slowly processing
+// bundle might have it's remaining buffered elements truncated
+// so they can be processed by a another bundle,
+// possibly simultaneously.
+//
+// Second is sub element splitting, where a single element
+// in an SDF might be split into smaller restrictions.
+//
+// Third with Checkpointing or ProcessContinuations,
+// a User DoFn may decide to defer processing of an element
+// until later, permitting a bundle to terminate earlier,
+// delaying processing.
+//
+// All these may be tested locally or in process with a small
+// server the DoFns can connect to. This can then indicate which
+// elements, or positions are considered "sentinels".
+//
+// When a sentinel is to be processed, instead the DoFn blocks.
+// The goal for Splitting tests is to succeed only when all
+// sentinels are blocking waiting to be processed.
+// This indicates the runner has "separated" the sentinels, hence
+// the name "separation harness tests".
+//
+// Delayed Process Continuations can be similiarly tested,
+// as this emulates external processing servers anyway.
+// It's much simpler though, as the request is to determine if
+// a given element should be delayed or not. This could be used
+// for arbitrarily complex splitting patterns, as desired.
+func TestSeparation(t *testing.T) {
+	initRunner(t)
+
+	ws.initRPCServer()
+
+	tests := []struct {
+		name     string
+		pipeline func(s beam.Scope)
+		metrics  func(t *testing.T, pr beam.PipelineResult)
+	}{
+		{
+			name: "ProcessContinuations_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &sepHarnessSdfStream{
+					Base: sepHarnessBase{
+						WatcherID:         ws.newWatcher(3),
+						Sleep:             time.Second,
+						IsSentinelEncoded: beam.EncodedFunc{Fn: reflectx.MakeFunc(allSentinel)},
+						LocalService:      ws.serviceAddress,
+					},
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global num ints", count)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_globalWindow",
+			pipeline: func(s beam.Scope) {
+				count := 10
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &singleStepSdfStream{
+					Sleep:    time.Second,
+					RestSize: int64(count),
+				}, imp)
+				passert.Count(s, out, "global stepped num ints", count)
+				sum := beam.ParDo(s, dofn2x1, imp, beam.SideInput{Input: out})
+				beam.ParDo(s, &int64Check{Name: "stepped", Want: []int{45}}, sum)
+			},
+		}, {
+			name: "ProcessContinuations_stepped_combine_fixedWindow",
+			pipeline: func(s beam.Scope) {
+				elms, mod := 1000, 10
+				count := int(elms / mod)
+				imp := beam.Impulse(s)
+				out := beam.ParDo(s, &eventtimeSDFStream{
+					Sleep:    time.Second,
+					RestSize: int64(elms),
+					Mod:      int64(mod),
+					Fixed:    1,
+				}, imp)
+				windowed := beam.WindowInto(s, window.NewFixedWindows(time.Second*10), out)
+				sum := stats.Sum(s, windowed)
+				// We expect each window to be processed ASAP, and produced one
+				// at a time, with the same results.
+				beam.ParDo(s, &int64Check{Name: "single", Want: []int{55}}, sum)
+				// But we need to receive the expected number of identical results
+				gsum := beam.WindowInto(s, window.NewGlobalWindows(), sum)
+				passert.Count(s, gsum, "total sums", count)
+			},
+		},
+	}
+
+	// TODO: Channel Splits
+	// TODO: SubElement/dynamic splits.
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p, s := beam.NewPipelineWithRoot()
+			test.pipeline(s)
+			pr, err := executeWithT(context.Background(), t, p)
+			if err != nil {
+				t.Fatal(err)
+			}
+			if test.metrics != nil {
+				test.metrics(t, pr)
+			}
+		})
+	}
+}
+
+func init() {
+	register.Function1x1(allSentinel)
+}
+
+// allSentinel indicates that all elements are sentinels.
+func allSentinel(v beam.T) bool {
+	return true
+}
+
+// Watcher is an instance of the counters.
+type watcher struct {
+	id                         int
+	mu                         sync.Mutex
+	sentinelCount, sentinelCap int
+}
+
+func (w *watcher) LogValue() slog.Value {
+	return slog.GroupValue(
+		slog.Int("id", w.id),
+		slog.Int("sentinelCount", w.sentinelCount),
+		slog.Int("sentinelCap", w.sentinelCap),
+	)
+}
+
+// Watchers is a "net/rpc" service.
+type Watchers struct {
+	mu             sync.Mutex
+	nextID         int
+	lookup         map[int]*watcher
+	serviceOnce    sync.Once
+	serviceAddress string
+}
+
+// Args is the set of parameters to the watchers RPC methdos.

Review Comment:
   ```suggestion
   // Args is the set of parameters to the watchers RPC methods.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org