You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/03/19 19:38:03 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #25808: [#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK

lostluck commented on code in PR #25808:
URL: https://github.com/apache/beam/pull/25808#discussion_r1141423750


##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main

Review Comment:
   It's important to have a blank line after the license, and before the package documentation so the license header isn't the documentation.  
   
   I've added a package comment, which should conform to the style guide: https://google.github.io/styleguide/go/decisions#package-comments
   
   ```suggestion
   // limitations under the License.
   
   // slowly_updating_side_input is an example pipeline demonstrating the pattern described
   // at https://beam.apache.org/documentation/patterns/side-inputs/.
   package main
   ```



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+	"context"
+	"flag"
+	"strings"
+	"time"
+
+	"cloud.google.com/go/pubsub"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+	register.Function4x0(update)
+	register.Function4x0(process)
+	register.Emitter2[int, string]()
+	register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
+	log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))
+
+	// zero is the key used in beam.AddFixedKey which will be applied on the main input.
+	id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)
+
+	emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
+	log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+	iter := side(k)
+
+	var externalData []string
+	var externalDatum string
+	for iter(&externalDatum) {
+		externalData = append(externalData, externalDatum)
+	}
+
+	log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+	if err != nil {
+		log.Fatalf(context.TODO(), format, args...)
+	}
+}
+
+func main() {
+	var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+	var periodicSequenceInterval time.Duration
+
+	now := time.Now()
+
+	flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
+		"The time at which to start the periodic sequence.")
+
+	flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
+		"The time at which to end the periodic sequence.")
+
+	flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
+		"The interval between periodic sequence output.")
+
+	flag.StringVar(&inputTopic, "input_topic", "input",
+		"The PubSub topic from which to read the main input data.")

Review Comment:
   A little non-standard, but I appreciate how this cleans up and ensures consistency in the default times values.



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+	"context"
+	"flag"
+	"strings"
+	"time"
+
+	"cloud.google.com/go/pubsub"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+	register.Function4x0(update)
+	register.Function4x0(process)
+	register.Emitter2[int, string]()
+	register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
+	log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))
+
+	// zero is the key used in beam.AddFixedKey which will be applied on the main input.
+	id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)
+
+	emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
+	log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+	iter := side(k)
+
+	var externalData []string
+	var externalDatum string
+	for iter(&externalDatum) {
+		externalData = append(externalData, externalDatum)
+	}
+
+	log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+	if err != nil {
+		log.Fatalf(context.TODO(), format, args...)
+	}
+}
+
+func main() {
+	var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+	var periodicSequenceInterval time.Duration
+
+	now := time.Now()
+
+	flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
+		"The time at which to start the periodic sequence.")
+
+	flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
+		"The time at which to end the periodic sequence.")
+
+	flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
+		"The interval between periodic sequence output.")
+
+	flag.StringVar(&inputTopic, "input_topic", "input",
+		"The PubSub topic from which to read the main input data.")
+
+	flag.Parse()
+	beam.Init()
+	ctx := context.Background()
+	p, s := beam.NewPipelineWithRoot()
+
+	project := gcpopts.GetProject(ctx)
+	client, err := pubsub.NewClient(ctx, project)
+	fatalf(err, "Failed to create client: %v", err)
+	_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
+	fatalf(err, "Failed to ensure topic: %v", err)
+
+	mainInput := beam.WindowInto(
+		s,
+		window.NewFixedWindows(periodicSequenceInterval),
+		beam.AddFixedKey( // simulate keyed data by adding a fixed key
+			s,
+			pubsubio.Read(
+				s,
+				project,
+				inputTopic,
+				nil,
+			),
+		),
+		beam.Trigger(trigger.Repeat(trigger.Always())),
+		beam.PanesDiscard(),
+	)
+
+	startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
+	endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)
+	sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
+		beam.ParDo(
+			s,
+			update,
+			periodic.Impulse(
+				s,
+				startTime,
+				endTime,
+				periodicSequenceInterval,
+			),
+		),
+		beam.Trigger(trigger.Repeat(trigger.Always())),
+		beam.PanesDiscard(),
+	)

Review Comment:
   The nesting here, has the added deficit of hiding the core of the example.
   
   ```suggestion
      // Generate an impulse every period.
       periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval)
       
       // Use the impulse to trigger some other ordinary transform.
       updatedImp := beam.ParDo(s, update, periodicImp)
       
       // Window for use as a side input, to allow the input to change with windows.
   	sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
   		updatedImp,
   		beam.Trigger(trigger.Repeat(trigger.Always())),
   		beam.PanesDiscard(),
   	)
   ```
   
   I'll note that the window for the side input is usually going to be larger than the window for the main processing. While this isn't wrong, the usualy goal around the pattern is a situation like allowing files that change hourly get read in once each hour, and have the more frequent data able to re-use the cached read in file.  (Granted, this behavior isn't yet enabled by default in the Go SDK, but that's an aside).



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time

Review Comment:
   This will work in the direct runner, but that's because the direct runner won't successfully run the example or anything local. It would be better to fold things into the sequence definition to enable appropriate testing behavior.



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+	"context"
+	"flag"
+	"strings"
+	"time"
+
+	"cloud.google.com/go/pubsub"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+	register.Function4x0(update)
+	register.Function4x0(process)
+	register.Emitter2[int, string]()
+	register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
+	log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))
+
+	// zero is the key used in beam.AddFixedKey which will be applied on the main input.
+	id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)
+
+	emit(id, externalData)
+}
+
+// process simulates processing of main input. It reads side input by key
+func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
+	log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)
+
+	iter := side(k)
+
+	var externalData []string
+	var externalDatum string
+	for iter(&externalDatum) {
+		externalData = append(externalData, externalDatum)
+	}
+
+	log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
+}
+
+func fatalf(err error, format string, args ...interface{}) {
+	if err != nil {
+		log.Fatalf(context.TODO(), format, args...)
+	}
+}
+
+func main() {
+	var inputTopic, periodicSequenceStart, periodicSequenceEnd string
+	var periodicSequenceInterval time.Duration
+
+	now := time.Now()
+
+	flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
+		"The time at which to start the periodic sequence.")
+
+	flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
+		"The time at which to end the periodic sequence.")
+
+	flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
+		"The interval between periodic sequence output.")
+
+	flag.StringVar(&inputTopic, "input_topic", "input",
+		"The PubSub topic from which to read the main input data.")
+
+	flag.Parse()
+	beam.Init()
+	ctx := context.Background()
+	p, s := beam.NewPipelineWithRoot()
+
+	project := gcpopts.GetProject(ctx)
+	client, err := pubsub.NewClient(ctx, project)
+	fatalf(err, "Failed to create client: %v", err)
+	_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
+	fatalf(err, "Failed to ensure topic: %v", err)
+
+	mainInput := beam.WindowInto(
+		s,
+		window.NewFixedWindows(periodicSequenceInterval),
+		beam.AddFixedKey( // simulate keyed data by adding a fixed key
+			s,
+			pubsubio.Read(
+				s,
+				project,
+				inputTopic,
+				nil,
+			),
+		),
+		beam.Trigger(trigger.Repeat(trigger.Always())),
+		beam.PanesDiscard(),
+	)

Review Comment:
   Don't do this. This is far less readable than having single use variables, which makes the structure of the pipeline explicit, and linear with reading the program.
   
   Nesting like this leads to needing to read the code to the middle, then back track back up, in order to understand the pipeline, which isn't useful for example code, which will be read way more than written.
   
   ```suggestion
       source :=  pubsubio.Read(s, project, inputTopic, nil)
       keyedSource :=	beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key
   	mainInput := beam.WindowInto(
   		s,
   		keyedSource,
   		window.NewFixedWindows(periodicSequenceInterval),
   		beam.Trigger(trigger.Repeat(trigger.Always())),
   		beam.PanesDiscard(),
   	)
   ```



##########
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package main
+
+import (
+	"context"
+	"flag"
+	"strings"
+	"time"
+
+	"cloud.google.com/go/pubsub"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+	_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
+)
+
+func init() {
+	register.Function4x0(update)
+	register.Function4x0(process)
+	register.Emitter2[int, string]()
+	register.Iter1[string]()
+}
+
+// update simulates an external call to get data for the side input.
+func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
+	log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))

Review Comment:
   It's a small but critical thing that we clarify this is in event time, since it might not match processing time, depending on how the source is configured.
   
   ```suggestion
   	log.Infof(ctx, "Making external call %d at event time %s", i, t.ToTime().Format(time.RFC3339))
   ```



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########


Review Comment:
   We should be able to add a short unit test if using the `prism` runner directly. 
   
   While the runner isn't fully complete yet, it *does* run and execute ProcessContinuation transforms and watermarks!
   
   It just doesn't do the splitting just yet, or actually "wait" for any process continuations at the moment. But when the "sequence" is done, it will terminate, so we can add a test with period of a second, a duration of a minute, and then count that we're getting 60 elements out of the transform. (Small risk of getting 59 instead, as a flake...)



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {

Review Comment:
   Just to explain what happens here and how it works:
   
   The transform will continue until the restriction is fully processed, or the runner has told the bundle to split. On that split, tryClaim returns false, GetError nil, isDone false, and then we reschedule for later with the ResumeProcessingIn continuation. Note that this isn't a hard timer, and a runner may be early or late (usually early).
   
   The output watermark is held to the last output timestamp. When Stop happens, the watermark can then advance to whereever the source is at (probably end of Global Window).



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {
+			emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
+			currentOutputIndex += 1
+			currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+			currentTime = fn.now()
+			we.UpdateWatermark(currentOutputTimestamp)
+		} else if err := rt.GetError(); err != nil || rt.IsDone() {
+			// Stop processing on error or completion
+			return sdf.StopProcessing(), rt.GetError()
+		} else {
+			return sdf.ResumeProcessingIn(sd.Interval), nil
+		}
+	}
+
+	return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+	ApplyWindow bool
+
+	now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+	return func(o *impulseConfig) error {
+		o.ApplyWindow = true
+		return nil
+	}
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {

Review Comment:
   Per previous mention, this can't be serialized so it's non portable (and it's complicated to validate in once it's passed through serialization if it's hacked around). I'd remove it.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {
+			emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
+			currentOutputIndex += 1
+			currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+			currentTime = fn.now()
+			we.UpdateWatermark(currentOutputTimestamp)
+		} else if err := rt.GetError(); err != nil || rt.IsDone() {
+			// Stop processing on error or completion
+			return sdf.StopProcessing(), rt.GetError()
+		} else {
+			return sdf.ResumeProcessingIn(sd.Interval), nil
+		}
+	}
+
+	return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+	ApplyWindow bool
+
+	now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption

Review Comment:
   Since this is just immeadiately aliased in the same package, the code may as well directly export the type. The indirection doesn't add anything but possible confusion to readers in switching between the two ways of referring to the same type.
   
   Per the style guide: Don't use aliases if they aren't needed https://google.github.io/styleguide/go/decisions#type-aliases



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {
+			emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
+			currentOutputIndex += 1
+			currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+			currentTime = fn.now()
+			we.UpdateWatermark(currentOutputTimestamp)
+		} else if err := rt.GetError(); err != nil || rt.IsDone() {
+			// Stop processing on error or completion
+			return sdf.StopProcessing(), rt.GetError()
+		} else {
+			return sdf.ResumeProcessingIn(sd.Interval), nil
+		}
+	}
+
+	return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+	ApplyWindow bool
+
+	now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {

Review Comment:
   I don't expect additional options will be added, so lets match Java and Python with this and use a boolean.
   
   Effectively this transform just produces a PCollection, which means any expansions can be ordinary DoFns or other composites to add other functionality. No need to overcomplicate things here.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {
+			emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
+			currentOutputIndex += 1
+			currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+			currentTime = fn.now()
+			we.UpdateWatermark(currentOutputTimestamp)
+		} else if err := rt.GetError(); err != nil || rt.IsDone() {
+			// Stop processing on error or completion
+			return sdf.StopProcessing(), rt.GetError()
+		} else {
+			return sdf.ResumeProcessingIn(sd.Interval), nil
+		}
+	}
+
+	return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+	ApplyWindow bool
+
+	now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+	return func(o *impulseConfig) error {
+		o.ApplyWindow = true
+		return nil
+	}
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {
+	return func(o *impulseConfig) error {
+		o.now = now
+		return nil
+	}
+}
+
+// Impulse is a PTransform which generates a sequence of timestamped
+// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
+// element will be assigned to its own fixed window of interval size.
+//
+// The transform behaves the same as [Sequence] transform, but can be
+// used as the first transform in a pipeline.
+//
+// The following applies to the arguments.
+//   - if interval <= 0, interval is set to [math.MaxInt64]
+//   - if start is a zero value [time.Time], start is set to the current time
+//   - if start is after end, start is set to end
+//
+// The PCollection generated by Impulse is unbounded and the output elements
+// are the [time.UnixMilli] int64 values of the output timestamp.
+func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection {
+	if interval <= 0 {
+		interval = math.MaxInt64
+	}
+	if start.IsZero() {
+		start = time.Now()
+	}
+	if start.After(end) {
+		start = end
+	}

Review Comment:
   And mTime.Times are prevented from being serialized. 
   
   I'd use int64s makes it clear, and it's an implementation detail. (and did so for a similar, but test specific transform, for testing prism). Reduces likely hood of architecture specific issues too since int's width is arch dependent.
   
   TBH it's odd to me that the "value" for the periodic impulses are time instants, which are the event time anyway. I'd prefer we avoid the needless duplication Java and Python take by just having int for Sequence, and []byte{} for Impulse. Int is specific and clear enough for sequence (it's a sequence of numbers, done), but int64 is likely better for clarity on range across architectures. []byte is what the normal Impulse returns, and so it would make it more compatible / drop in with existing transforms that consume the []byte, even if it's ignored.
   
   We can also just encode the time/timestamp into the []byte, if we so choose, but again, it's redundant.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}

Review Comment:
   It should look at the existing restriction and modify start and end appropriately, but this might be OK for periodic sequence (especially if it's been validated to work as expected on a runner like Google Cloud Dataflow).
   
   Basically the restriction will be immeadiately provided to the ProcessElement method, and that method needs to handle the given restriction correctly, so it's probably nice to know what the last "state" of the restriction is, even if a ProcessContinuation Stop is immeadiately getting returned next.
   
   It's a little un-intuitive, but to avoid dataloss errors, drain can be thought of as simply advancing all root watermarks immeadiately to EndOfGlobalWindow, and allowing the pipeline to terminate processing. The TruncateRestrictions are to re-process current restrictions, and shrink them. Eg. TruncateRestriction is where an IO could comunicate and write to an external system about what hasn't been processed yet, so the next caller of that system can pick up where it was left off.  Or a system like Pubsub would effectively handles that case by simply not having any of the outstanding messages acked, so they'll be delivered to the next time the subscriber reads the topic. Different systems, both effective.



##########
sdks/go/pkg/beam/transforms/periodic/periodic.go:
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package periodic contains transformations for generating periodic sequences.
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"math"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
+		func(beam.EventTime, int64),
+		sdf.ProcessContinuation, error](&sequenceGenDoFn{})
+	register.Emitter2[beam.EventTime, int64]()
+	beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
+}
+
+// SequenceDefinition holds the configuration for generating a sequence of
+// timestamped elements at an interval.
+type SequenceDefinition struct {
+	Interval time.Duration
+	Start    time.Time
+	End      time.Time
+}
+
+type sequenceGenDoFn struct {
+	now func() time.Time
+}
+
+func (fn *sequenceGenDoFn) Setup() {
+	if fn.now == nil {
+		fn.now = time.Now
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
+	totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
+	return offsetrange.Restriction{
+		Start: int64(0),
+		End:   int64(totalOutputs),
+	}
+}
+
+func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
+	return []offsetrange.Restriction{rest}
+}
+
+// TruncateRestriction immediately truncates the entire restrication.
+func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
+	return offsetrange.Restriction{}
+}
+
+func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
+	return &sdf.ManualWatermarkEstimator{}
+}
+
+func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
+	currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
+	currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+	currentTime := fn.now()
+	we.UpdateWatermark(currentOutputTimestamp)
+	for currentOutputTimestamp.Before(currentTime) {
+		if rt.TryClaim(currentOutputIndex) {
+			emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
+			currentOutputIndex += 1
+			currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
+			currentTime = fn.now()
+			we.UpdateWatermark(currentOutputTimestamp)
+		} else if err := rt.GetError(); err != nil || rt.IsDone() {
+			// Stop processing on error or completion
+			return sdf.StopProcessing(), rt.GetError()
+		} else {
+			return sdf.ResumeProcessingIn(sd.Interval), nil
+		}
+	}
+
+	return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
+}
+
+type impulseConfig struct {
+	ApplyWindow bool
+
+	now func() time.Time
+}
+
+type impulseOption func(*impulseConfig) error
+
+// ImpulseOption is a function that configures an [Impulse] transform.
+type ImpulseOption = impulseOption
+
+// WithApplyWindow configures the [Impulse] transform to apply a fixed window
+// transform to the output PCollection.
+func WithApplyWindow() ImpulseOption {
+	return func(o *impulseConfig) error {
+		o.ApplyWindow = true
+		return nil
+	}
+}
+
+func withNowFunc(now func() time.Time) ImpulseOption {
+	return func(o *impulseConfig) error {
+		o.now = now
+		return nil
+	}
+}
+
+// Impulse is a PTransform which generates a sequence of timestamped
+// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
+// element will be assigned to its own fixed window of interval size.
+//
+// The transform behaves the same as [Sequence] transform, but can be
+// used as the first transform in a pipeline.
+//
+// The following applies to the arguments.
+//   - if interval <= 0, interval is set to [math.MaxInt64]
+//   - if start is a zero value [time.Time], start is set to the current time
+//   - if start is after end, start is set to end
+//
+// The PCollection generated by Impulse is unbounded and the output elements
+// are the [time.UnixMilli] int64 values of the output timestamp.
+func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection {
+	if interval <= 0 {
+		interval = math.MaxInt64
+	}
+	if start.IsZero() {
+		start = time.Now()
+	}
+	if start.After(end) {
+		start = end
+	}

Review Comment:
   For handling End, we can also just have a variable set to "largest practical time", and use that. It's going to be a very rare case where someone wants to process a stream of data that extends past the year 9999.  (Which we could work around by changing the serialization... but that's not urgent now.) 
   
   We can also pick something like another arbitrary time with a nanosecond scale difference as a sentinel value that we check for (eg. Zero + 1 nanosecond), which we check for and treat as End of Time. Beam doesn't promise handling event time granularities under microseconds IIRC (if smaller than milliseconds at all...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org