You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/04/25 23:39:30 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #26101: [Go SDK] Timers with new datalayer

lostluck commented on code in PR #26101:
URL: https://github.com/apache/beam/pull/26101#discussion_r1177040045


##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))

Review Comment:
   Just use `time.Now()` directly here, since it's getting converted to a `time.Time` anyway.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
+	s.TimerTime.Write(sp, toFire)
+
+	return nil
+}
+
+type eventtimeSDFStream struct {

Review Comment:
   ```suggestion
   // eventtimeSDFStream is a Watermark Estimating, Unbounded Splittable DoFn that emits a single element, then sleeps until the next emit by the runner.
   // After emitting the configured number of elements, it stops processing, allowing the pipeline to terminate.
   type eventtimeSDFStream struct {
   ```



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.

Review Comment:
   Once the changes have been done to the pipeline, we can update this Description to match it (eg. it's using periodic sequence, and it will self terminate, so we don't need to tell people to cancel it, etc).



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -345,6 +355,41 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
 	return val, nil
 }
 
+func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, tmap TimerRecv) (*FullValue, error) {
+	// Defer side input clean-up in case of panic
+	var err error
+	defer func() {
+		if postErr := n.postInvoke(); postErr != nil {
+			err = postErr
+		}
+	}()
+	if err := n.preInvoke(ctx, tmap.Windows, tmap.HoldTimestamp); err != nil {
+		return nil, err
+	}
+
+	var extra []any
+	extra = append(extra, timerFamilyID)
+
+	if tmap.Tag != "" {
+		extra = append(extra, tmap.Tag)
+	}
+	extra = append(extra, n.cache.extra...)
+	val, err := InvokeWithOpts(ctx, fn, tmap.Pane, tmap.Windows, tmap.HoldTimestamp, InvokeOpts{
+		opt:   &MainInput{Key: *tmap.Key},
+		bf:    n.bf,
+		we:    n.we,
+		sa:    n.UState,
+		sr:    n.reader,
+		ta:    n.Timer,
+		tm:    n.timerManager,
+		extra: extra,
+	})

Review Comment:
   Please create and cache the onTimer invoker, like we do for processElement.
   
   It's likely that with the move to more streaming and the single element bundles, we'll want to *(not now but later)* optimize the StartBundle and FinishBundle calls too. But I'll need to get a performance measure before doing that.
   
   Besides, it's looking like there's a faster & zero alloc `reflect.Caller` thing that may land with Go 1.21. If that performs well enough, that work (and all the generic generated stuff), might become moot. *fingers crossed*



##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package timers contains structs for setting pipeline timers.
+package timers
+
+import (
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+var (
+	// ProviderType represents the type of timer provider.
+	ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+// TimeDomainEnum represents different time domains to set timer.
+type TimeDomainEnum int32
+
+const (
+	// TimeDomainUnspecified represents unspecified time domain.
+	TimeDomainUnspecified TimeDomainEnum = 0
+	// TimeDomainEventTime is time from the perspective of the data
+	TimeDomainEventTime TimeDomainEnum = 1
+	// TimeDomainProcessingTime is time from the perspective of the
+	// execution of your pipeline
+	TimeDomainProcessingTime TimeDomainEnum = 2
+)
+
+// TimerMap holds timer information obtained from the pipeline.
+type TimerMap struct {
+	Family                       string
+	Tag                          string
+	Clear                        bool
+	FireTimestamp, HoldTimestamp mtime.Time
+}
+
+type timerConfig struct {
+	Tag           string
+	HoldTimestamp mtime.Time
+}
+
+type timerOptions func(*timerConfig)
+
+// WithTag sets the tag for the timer.
+func WithTag(tag string) timerOptions {
+	return func(tm *timerConfig) {
+		tm.Tag = tag
+	}
+}
+
+// WithOutputTimestamp sets the output timestamp for the timer.
+func WithOutputTimestamp(outputTimestamp time.Time) timerOptions {
+	return func(tm *timerConfig) {
+		tm.HoldTimestamp = mtime.FromTime(outputTimestamp)
+	}
+}
+
+// Provider represents a timer provider interface.
+type Provider interface {
+	Set(t TimerMap)
+}
+
+// PipelineTimer interface represents valid timer type.
+type PipelineTimer interface {
+	TimerFamily() string
+	TimerDomain() TimeDomainEnum

Review Comment:
   To enable composites, basically these two methods are replaced with `BeamTimers() map[string]TimerDomain` and similarly for the implementations returning like `return map[string]TimerDomainEnum`



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)

Review Comment:
   Remove, these are unused in this example.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
+	s.TimerTime.Write(sp, toFire)
+
+	return nil
+}
+
+type eventtimeSDFStream struct {
+	RestSize, Mod, Fixed int64
+	Sleep                time.Duration
+}
+
+func (fn *eventtimeSDFStream) Setup() error {
+	return nil
+}
+
+func (fn *eventtimeSDFStream) CreateInitialRestriction(v beam.T) offsetrange.Restriction {
+	return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *eventtimeSDFStream) SplitRestriction(v beam.T, r offsetrange.Restriction) []offsetrange.Restriction {
+	// No split
+	return []offsetrange.Restriction{r}
+}
+
+func (fn *eventtimeSDFStream) RestrictionSize(v beam.T, r offsetrange.Restriction) float64 {
+	return r.Size()
+}
+
+func (fn *eventtimeSDFStream) CreateTracker(r offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *eventtimeSDFStream) ProcessElement(ctx context.Context, _ *CWE, rt *sdf.LockRTracker, v beam.T, emit func(beam.EventTime, int64)) sdf.ProcessContinuation {
+	r := rt.GetRestriction().(offsetrange.Restriction)
+	i := r.Start
+	if r.Size() < 1 {
+		log.Debugf(ctx, "size 0 restriction, stoping to process sentinel %v", slog.Any("value", v))
+		return sdf.StopProcessing()
+	}
+	slog.Debug("emitting element to restriction", slog.Any("value", v), slog.Group("restriction",
+		slog.Any("value", v),
+		slog.Float64("size", r.Size()),
+		slog.Int64("pos", i),
+	))
+	if rt.TryClaim(i) {
+		v := (i % fn.Mod) + fn.Fixed
+		emit(mtime.Now(), v)
+	}
+	return sdf.ResumeProcessingIn(fn.Sleep)
+}
+
+func (fn *eventtimeSDFStream) InitialWatermarkEstimatorState(_ beam.EventTime, _ offsetrange.Restriction, _ beam.T) int64 {
+	return int64(mtime.MinTimestamp)
+}
+
+func (fn *eventtimeSDFStream) CreateWatermarkEstimator(initialState int64) *CWE {
+	return &CWE{Watermark: initialState}
+}
+
+func (fn *eventtimeSDFStream) WatermarkEstimatorState(e *CWE) int64 {
+	return e.Watermark
+}
+
+type CWE struct {
+	Watermark int64 // uses int64, since the SDK prevent mtime.Time from serialization.
+}
+
+func (e *CWE) CurrentWatermark() time.Time {
+	return mtime.Time(e.Watermark).ToTime()
+}
+
+func (e *CWE) ObserveTimestamp(ts time.Time) {
+	// We add 10 milliseconds to allow window boundaries to
+	// progress after emitting
+	e.Watermark = int64(mtime.FromTime(ts.Add(-90 * time.Millisecond)))
+}
+
+func init() {
+	register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{})
+	register.Emitter2[string, string]()
+	register.DoFn5x1[context.Context, *CWE, *sdf.LockRTracker, beam.T, func(beam.EventTime, int64), sdf.ProcessContinuation]((*eventtimeSDFStream)(nil))
+	register.Emitter2[beam.EventTime, int64]()
+}
+
+func main() {
+	flag.Parse()
+	beam.Init()
+
+	ctx := context.Background()
+
+	log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)

Review Comment:
   Remove, we don't publish to pubsub in this example.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
+	s.TimerTime.Write(sp, toFire)
+
+	return nil
+}
+
+type eventtimeSDFStream struct {
+	RestSize, Mod, Fixed int64
+	Sleep                time.Duration
+}
+
+func (fn *eventtimeSDFStream) Setup() error {
+	return nil
+}
+
+func (fn *eventtimeSDFStream) CreateInitialRestriction(v beam.T) offsetrange.Restriction {
+	return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *eventtimeSDFStream) SplitRestriction(v beam.T, r offsetrange.Restriction) []offsetrange.Restriction {
+	// No split
+	return []offsetrange.Restriction{r}
+}
+
+func (fn *eventtimeSDFStream) RestrictionSize(v beam.T, r offsetrange.Restriction) float64 {
+	return r.Size()
+}
+
+func (fn *eventtimeSDFStream) CreateTracker(r offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *eventtimeSDFStream) ProcessElement(ctx context.Context, _ *CWE, rt *sdf.LockRTracker, v beam.T, emit func(beam.EventTime, int64)) sdf.ProcessContinuation {
+	r := rt.GetRestriction().(offsetrange.Restriction)
+	i := r.Start
+	if r.Size() < 1 {
+		log.Debugf(ctx, "size 0 restriction, stoping to process sentinel %v", slog.Any("value", v))

Review Comment:
   We can change this to an log.Infof, so it shows up in logs easily. It's a toy example, so it's nice to have indications of the benefit.



##########
sdks/go/pkg/beam/core/funcx/fn.go:
##########


Review Comment:
   Please add unit tests for some of the new paths in the state machine & the TimerProvider method to fn_test.go
   
   https://github.com/apache/beam/blob/601dd58a5763e75c06c71cf98ae7c2d4c9591f8d/sdks/go/pkg/beam/core/funcx/fn_test.go#L51



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########


Review Comment:
   Same here, Unit tests: https://github.com/apache/beam/blob/601dd58a5763e75c06c71cf98ae7c2d4c9591f8d/sdks/go/pkg/beam/core/graph/fn_test.go



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1350,6 +1385,57 @@ func validateState(fn *DoFn, numIn mainInputs) error {
 	return nil
 }
 
+func validateOnTimerFn(fn *DoFn) error {
+	if _, ok := fn.OnTimerFn(); !ok {
+		err := errors.Errorf("OnTimer function not defined for DoFn: %v", fn.Name())
+		return errors.SetTopLevelMsgf(err, "OnTimer function not defined for DoFn: %v. Ensure that OnTimer function is implemented for the DoFn.", fn.Name())
+	}
+
+	return nil
+}
+
+func validateTimer(fn *DoFn) error {
+	if fn.Fn == nil {
+		return nil
+	}
+
+	pt := fn.PipelineTimers()
+
+	if _, ok := fn.Fn.TimerProvider(); ok {
+		if len(pt) == 0 {
+			err := errors.New("ProcessElement uses a TimerProvider, but no Timer fields are defined in the DoFn")
+			return errors.SetTopLevelMsgf(err, "ProcessElement uses a TimerProvider, but no timer fields are defined in the DoFn"+
+				", Ensure that you are including the exported timer field in the DoFn that you're using to set/clear timers.")

Review Comment:
   ```suggestion
   				", Ensure that your DoFn exports the Timer fields used to set and clear timers.")
   ```



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1350,6 +1385,57 @@ func validateState(fn *DoFn, numIn mainInputs) error {
 	return nil
 }
 
+func validateOnTimerFn(fn *DoFn) error {
+	if _, ok := fn.OnTimerFn(); !ok {
+		err := errors.Errorf("OnTimer function not defined for DoFn: %v", fn.Name())
+		return errors.SetTopLevelMsgf(err, "OnTimer function not defined for DoFn: %v. Ensure that OnTimer function is implemented for the DoFn.", fn.Name())
+	}
+

Review Comment:
   This should validate that any emitters match ProcessElement exactly.
   
   It doesn't seem like OnTimer should support side inputs. Reuven has mentioned the semantics would be unclear (eg. which timers etc).



##########
sdks/go/pkg/beam/core/runtime/exec/coder.go:
##########
@@ -1282,20 +1299,19 @@ func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io.
 			return errors.WithContext(err, "error encoding paneinfo")
 		}
 	}
-
 	w.Write(b.Bytes())
+
 	return nil
 }
 
 // decodeTimer decodes timer byte encoded with standard timer coder spec.
-func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (typex.TimerMap, error) {
-	tm := typex.TimerMap{}
-
-	fv, err := dec.Decode(r)
+func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (TimerRecv, error) {
+	tm := TimerRecv{}
+	key, err := dec.Decode(r)
 	if err != nil {
-		return tm, errors.WithContext(err, "error decoding timer key")

Review Comment:
   I think we can keep the "timer key" here, since it's refering to the key that this timer is associated with. (distinct from family)



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -263,8 +263,17 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
 		}
 	},
 		func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {
-			tmap, err := decodeTimer(cp, wc, bcr)
-			log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v - %+v  err: %v", ptransformID, timerFamilyID, tmap, err)
+			dc := MakeElementDecoder(coder.SkipW(c).Components[0])

Review Comment:
   Move this out of the per element loop please. It's better that it's accidentally called all the time because of single element bundles, than called on every individual timer in a single batch.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
+	s.TimerTime.Write(sp, toFire)
+
+	return nil
+}
+
+type eventtimeSDFStream struct {
+	RestSize, Mod, Fixed int64
+	Sleep                time.Duration
+}
+
+func (fn *eventtimeSDFStream) Setup() error {
+	return nil
+}
+
+func (fn *eventtimeSDFStream) CreateInitialRestriction(v beam.T) offsetrange.Restriction {
+	return offsetrange.Restriction{Start: 0, End: fn.RestSize}
+}
+
+func (fn *eventtimeSDFStream) SplitRestriction(v beam.T, r offsetrange.Restriction) []offsetrange.Restriction {
+	// No split
+	return []offsetrange.Restriction{r}
+}
+
+func (fn *eventtimeSDFStream) RestrictionSize(v beam.T, r offsetrange.Restriction) float64 {
+	return r.Size()
+}
+
+func (fn *eventtimeSDFStream) CreateTracker(r offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(r))
+}
+
+func (fn *eventtimeSDFStream) ProcessElement(ctx context.Context, _ *CWE, rt *sdf.LockRTracker, v beam.T, emit func(beam.EventTime, int64)) sdf.ProcessContinuation {
+	r := rt.GetRestriction().(offsetrange.Restriction)
+	i := r.Start
+	if r.Size() < 1 {
+		log.Debugf(ctx, "size 0 restriction, stoping to process sentinel %v", slog.Any("value", v))
+		return sdf.StopProcessing()
+	}
+	slog.Debug("emitting element to restriction", slog.Any("value", v), slog.Group("restriction",
+		slog.Any("value", v),
+		slog.Float64("size", r.Size()),
+		slog.Int64("pos", i),
+	))

Review Comment:
   We need to remove the references to `slog` in this example for the time being. It's not yet part of the standard library, and doesn't currently add value over the beam log package.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -1350,6 +1385,57 @@ func validateState(fn *DoFn, numIn mainInputs) error {
 	return nil
 }
 
+func validateOnTimerFn(fn *DoFn) error {
+	if _, ok := fn.OnTimerFn(); !ok {
+		err := errors.Errorf("OnTimer function not defined for DoFn: %v", fn.Name())
+		return errors.SetTopLevelMsgf(err, "OnTimer function not defined for DoFn: %v. Ensure that OnTimer function is implemented for the DoFn.", fn.Name())
+	}
+
+	return nil
+}
+
+func validateTimer(fn *DoFn) error {
+	if fn.Fn == nil {
+		return nil
+	}
+
+	pt := fn.PipelineTimers()
+
+	if _, ok := fn.Fn.TimerProvider(); ok {
+		if len(pt) == 0 {
+			err := errors.New("ProcessElement uses a TimerProvider, but no Timer fields are defined in the DoFn")
+			return errors.SetTopLevelMsgf(err, "ProcessElement uses a TimerProvider, but no timer fields are defined in the DoFn"+
+				", Ensure that you are including the exported timer field in the DoFn that you're using to set/clear timers.")
+		}
+		timerKeys := make(map[string]timers.PipelineTimer)
+		for _, t := range pt {
+			k := t.TimerFamily()
+			if timer, ok := timerKeys[k]; ok {
+				err := errors.Errorf("Duplicate timer key %v", k)
+				return errors.SetTopLevelMsgf(err, "Duplicate timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k, timer, t)

Review Comment:
   We should have the error match the Programming Guide term, "timer family ID" not "timer key".
   
   ```suggestion
   				err := errors.Errorf("Duplicate timer family ID %v", k)
   				return errors.SetTopLevelMsgf(err, "Duplicate timer family ID %v used by %v and %v. Ensure that timer family IDs are unique per DoFn", k, timer, t)
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -263,8 +263,17 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
 		}
 	},
 		func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {
-			tmap, err := decodeTimer(cp, wc, bcr)
-			log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v - %+v  err: %v", ptransformID, timerFamilyID, tmap, err)
+			dc := MakeElementDecoder(coder.SkipW(c).Components[0])
+			tmap, err := decodeTimer(dc, wc, bcr)
+			if err != nil {
+				return errors.WithContext(err, "error decoding timer in datasource")
+			}
+			if fn, ok := n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok {
+				_, err := n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, tmap)
+				if err != nil {
+					return errors.WithContext(err, "ontimer callback invocation failed")
+				}
+			}

Review Comment:
   It's worth having a WARN log here saying that `"expected transform <PTransformID>  to have an OnTimer method to accept "<timerFamilyID>" callback, but it did not. Please file an issue with Apache Beam with a reproducing DoFn: https://github.com/apache/beam/issues"`



##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package timers contains structs for setting pipeline timers.
+package timers
+
+import (
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+)
+
+var (
+	// ProviderType represents the type of timer provider.
+	ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+// TimeDomainEnum represents different time domains to set timer.
+type TimeDomainEnum int32

Review Comment:
   I'd just call this "TimeDomain" instead of saying it's an enum.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
+	s.TimerTime.Write(sp, toFire)
+
+	return nil
+}
+
+type eventtimeSDFStream struct {

Review Comment:
   Though TBH we shouldreplace this with a [Periodic Sequence](https://pkg.go.dev/github.com/apache/beam/sdks/v2@v2.47.0-RC1/go/pkg/beam/transforms/periodic#Sequence) which will simplify the example implementation. Replacing ~100 lines with ~5-8 seems like a readability win.
   
   Also, I know that means we can ensure the pipeline terminates.



##########
sdks/go/examples/timer_wordcap/wordcap.go:
##########
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
+// does the following:
+//
+//	(1) create a topic and publish a few messages to it
+//	(2) Set user state and timer
+//
+// NOTE: it only runs on Dataflow and must be manually cancelled.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+	"golang.org/x/exp/slog"
+)
+
+var (
+	input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
+)
+
+var (
+	data = []string{
+		"foo",
+		"bar",
+		"baz",
+	}
+)
+
+type Stateful struct {
+	ElementBag state.Bag[string]
+	TimerTime  state.Value[int64]
+	MinTime    state.Combining[int64, int64, int64]
+
+	OutputState timers.ProcessingTime
+}
+
+func NewStateful() *Stateful {
+	return &Stateful{
+		ElementBag: state.MakeBagState[string]("elementBag"),
+		TimerTime:  state.MakeValueState[int64]("timerTime"),
+		MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+
+		OutputState: timers.InProcessingTime("outputState"),
+	}
+}
+
+func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
+	switch timerKey {
+	case "outputState":
+		log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
+		s.OutputState.Clear(tp)
+		switch timerTag {
+		case "001":
+			log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
+			s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
+			emit(timerKey, timerTag)
+		}
+	}
+}
+
+func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
+	s.ElementBag.Add(sp, word)
+	s.MinTime.Add(sp, int64(ts))
+
+	toFire, ok, err := s.TimerTime.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !ok {
+		toFire = int64(mtime.Now().Add(1 * time.Minute))
+	}
+	minTime, _, err := s.MinTime.Read(sp)
+	if err != nil {
+		return err
+	}
+
+	s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))

Review Comment:
   IIRC, we have both minTime and minTime in millis? We should just call https://pkg.go.dev/time#UnixMilli here instead.
   
   Ideally we'd just store time.Times directly in the state, but that feels like a different bug to fix if it doesn't already work.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########


Review Comment:
   Do note that you can largely take the existing "Stateful" dofn from the example. There are also other stateful DoFn examples towards the bottom.
   
   Likely don't need that all the "bad" paths are unit tested, other than at least a quick manual check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org