You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "riteshghorse (via GitHub)" <gi...@apache.org> on 2023/07/14 15:01:45 UTC

[GitHub] [beam] riteshghorse commented on a diff in pull request #27496: [#22737] Add Go SDK timers to programming guide

riteshghorse commented on code in PR #27496:
URL: https://github.com/apache/beam/pull/27496#discussion_r1263830842


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6435,6 +6433,10 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" timer_output_timestamps_bad >}}
+{{< /highlight >}}\

Review Comment:
   ```suggestion
   {{< /highlight >}}
   ```



##########
sdks/go/examples/snippets/04transforms.go:
##########
@@ -644,40 +701,422 @@ type combineFn struct{}
 // combiningStateFn keeps track of the number of elements seen.
 type combiningStateFn struct {
 	// types are the types of the accumulator, input, and output respectively
-	val state.Combining[int, int, int]
+	Val state.Combining[int, int, int]
 }
 
 func (s *combiningStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error {
 	// Get the value stored in our state
-	val, _, err := s.val.Read(p)
+	val, _, err := s.Val.Read(p)
 	if err != nil {
 		return err
 	}
-	s.val.Add(p, 1)
+	s.Val.Add(p, 1)
 
 	if val > 10000 {
 		// Example of clearing and starting again with an empty bag
-		s.val.Clear(p)
+		s.Val.Clear(p)
 	}
 
 	return nil
 }
 
-func main() {
+func combineState(s beam.Scope, input beam.PCollection) beam.PCollection {
 	// ...
 	// CombineFn param can be a simple fn like this or a structural CombineFn
 	cFn := state.MakeCombiningState[int, int, int]("stateKey", func(a, b int) int {
 		return a + b
 	})
+	combined := beam.ParDo(s, combiningStateFn{Val: cFn}, input)
+
 	// ...
 
 	// [END combining_state]
 
-	fmt.Print(cFn)
+	return combined
+}
+
+// [START event_time_timer]
+
+type eventTimerDoFn struct {
+	State state.Value[int64]
+	Timer timers.EventTime
+}
+
+func (fn *eventTimerDoFn) ProcessElement(ts beam.EventTime, sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) {
+	// ...
+
+	// Set an event-time timer to the element timestamp.
+	fn.Timer.Set(tp, ts.ToTime())
+
+	// ...
+}
+
+func (fn *eventTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) {
+	switch timer.Family {
+	case fn.Timer.Family:
+		// process callback for this timer
+	}
+}
+
+func AddEventTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection {
+	return beam.ParDo(s, &eventTimerDoFn{
+		// Timers are given family names so their callbacks can be handled independantly.
+		Timer: timers.InEventTime("processWatermark"),
+		State: state.MakeValueState[int64]("latest"),
+	}, in)
+}
+
+// [END event_time_timer]
+
+// [START processing_time_timer]
+
+type processingTimerDoFn struct {
+	Timer timers.ProcessingTime
+}
+
+func (fn *processingTimerDoFn) ProcessElement(sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) {
+	// ...
+
+	// Set a timer to go off 30 seconds in the future.
+	fn.Timer.Set(tp, time.Now().Add(30*time.Second))
+
+	// ...
+}
+
+func (fn *processingTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) {
+	switch timer.Family {
+	case fn.Timer.Family:
+		// process callback for this timer
+	}
+}
+
+func AddProcessingTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection {
+	return beam.ParDo(s, &processingTimerDoFn{
+		// Timers are given family names so their callbacks can be handled independantly.
+		Timer: timers.InProcessingTime("timer"),
+	}, in)
+}
+
+// [END processing_time_timer]
+
+// [START dynamic_timer_tags]
+
+type hasAction interface {
+	Action() string
+}
+
+type dynamicTagsDoFn[V hasAction] struct {
+	Timer timers.EventTime
+}
+
+func (fn *dynamicTagsDoFn[V]) ProcessElement(ts beam.EventTime, tp timers.Provider, key string, value V, emitWords func(string)) {
+	// ...
+
+	// Set a timer to go off 30 seconds in the future.
+	fn.Timer.Set(tp, ts.ToTime(), timers.WithTag(value.Action()))
+
+	// ...
+}
+
+func (fn *dynamicTagsDoFn[V]) OnTimer(tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) {
+	switch timer.Family {
+	case fn.Timer.Family:
+		tag := timer.Tag // Do something with fired tag
+		_ = tag
+	}
+}
+
+func AddDynamicTimerTagsDoFn[V hasAction](s beam.Scope, in beam.PCollection) beam.PCollection {
+	return beam.ParDo(s, &dynamicTagsDoFn[V]{
+		Timer: timers.InEventTime("actionTimers"),
+	}, in)
+}
+
+// [END dynamic_timer_tags]
+
+// [START timer_output_timestamps_bad]
+
+type badTimerOutputTimestampsFn[V any] struct {
+	ElementBag  state.Bag[V]
+	TimerSet    state.Value[bool]
+	OutputState timers.ProcessingTime
+}
+
+func (fn *badTimerOutputTimestampsFn[V]) ProcessElement(sp state.Provider, tp timers.Provider, key string, value V, emit func(string)) error {
+	// Add the current element to the bag for this key.
+	if err := fn.ElementBag.Add(sp, value); err != nil {
+		return err
+	}
+	set, _, err := fn.TimerSet.Read(sp)
+	if err != nil {
+		return err
+	}
+	if !set {
+		fn.OutputState.Set(tp, time.Now().Add(1*time.Minute))
+		fn.TimerSet.Write(sp, true)
+	}
+	return nil
+}
+
+func (fn *badTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string)) error {
+	switch timer.Family {
+	case fn.OutputState.Family:
+		vs, _, err := fn.ElementBag.Read(sp)
+		if err != nil {
+			return err
+		}
+		for _, v := range vs {
+			// Output each element
+			emit(fmt.Sprintf("%v", v))
+		}
+
+		fn.ElementBag.Clear(sp)
+		// Note that the timer has now fired.
+		fn.TimerSet.Clear(sp)
+	}
+	return nil
+}
+
+// [END timer_output_timestamps_bad]
+
+// [START timer_output_timestamps_good]
+
+type element[V any] struct {
+	Timestamp int64
+	Value     V
+}
+
+type goodTimerOutputTimestampsFn[V any] struct {
+	ElementBag        state.Bag[element[V]]                // The bag of elements accumulated.
+	TimerTimerstamp   state.Value[int64]                   // The timestamp of the timer set.
+	MinTimestampInBag state.Combining[int64, int64, int64] // The minimum timestamp stored in the bag.
+	OutputState       timers.ProcessingTime                // The timestamp of the timer.
+}
+
+func (fn *goodTimerOutputTimestampsFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) error {
+	// ...
+	// Add the current element to the bag for this key, and preserve the event time.
+	if err := fn.ElementBag.Add(sp, element[V]{Timestamp: et.Milliseconds(), Value: value}); err != nil {
+		return err
+	}
+
+	// Keep track of the minimum element timestamp currently stored in the bag.
+	fn.MinTimestampInBag.Add(sp, et.Milliseconds())
+
+	// If the timer is already set, then reset it at the same time but with an updated output timestamp (otherwise
+	// we would keep resetting the timer to the future). If there is no timer set, then set one to expire in a minute.
+	ts, ok, _ := fn.TimerTimerstamp.Read(sp)
+	var tsToSet time.Time
+	if ok {
+		tsToSet = time.UnixMilli(ts)
+	} else {
+		tsToSet = time.Now().Add(1 * time.Minute)
+	}
+
+	minTs, _, _ := fn.MinTimestampInBag.Read(sp)
+	outputTs := time.UnixMilli(minTs)
+
+	// Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the
+	// timer fires. This allows outputting all the elements with their timestamp.
+	fn.OutputState.Set(tp, tsToSet, timers.WithOutputTimestamp(outputTs))
+	fn.TimerTimerstamp.Write(sp, tsToSet.UnixMilli())
+
+	return nil
+}
+
+func (fn *goodTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) error {
+	switch timer.Family {
+	case fn.OutputState.Family:
+		vs, _, err := fn.ElementBag.Read(sp)
+		if err != nil {
+			return err
+		}
+		for _, v := range vs {
+			// Output each element with their timestamp
+			emit(beam.EventTime(v.Timestamp), fmt.Sprintf("%v", v.Value))
+		}
+
+		fn.ElementBag.Clear(sp)
+		// Note that the timer has now fired.
+		fn.TimerTimerstamp.Clear(sp)
+	}
+	return nil
+}
+
+func AddTimedOutputBatching[V any](s beam.Scope, in beam.PCollection) beam.PCollection {
+	return beam.ParDo(s, &goodTimerOutputTimestampsFn[V]{
+		ElementBag:      state.MakeBagState[element[V]]("elementBag"),
+		TimerTimerstamp: state.MakeValueState[int64]("timerTimestamp"),
+		MinTimestampInBag: state.MakeCombiningState[int64, int64, int64]("minTimestampInBag", func(a, b int64) int64 {
+			if a < b {
+				return a
+			}
+			return b
+		}),
+		OutputState: timers.InProcessingTime("outputState"),
+	}, in)
+}
+
+// [END timer_output_timestamps_good]
+
+// updateState exists for example purposes only
+func updateState(sp, state, k, v any) {}
+
+// [START timer_garbage_collection]
+
+type timerGarbageCollectionFn[V any] struct {
+	State             state.Value[V]                       // The state for the key.
+	MaxTimestampInBag state.Combining[int64, int64, int64] // The maximum element timestamp seen so far.
+	GcTimer           timers.EventTime                     // The timestamp of the timer.
+}
+
+func (fn *timerGarbageCollectionFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) {
+	updateState(sp, fn.State, key, value)
+	fn.MaxTimestampInBag.Add(sp, et.Milliseconds())
+
+	// Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so
+	// as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's
+	// worth of event time (as measured by the watermark), then the gc timer will fire.
+	maxTs, _, _ := fn.MaxTimestampInBag.Read(sp)
+	expirationTime := time.UnixMilli(maxTs).Add(1 * time.Hour)
+	fn.GcTimer.Set(tp, expirationTime)
 }
 
+func (fn *timerGarbageCollectionFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) {
+	switch timer.Family {
+	case fn.GcTimer.Family:
+		// Clear all the state for the key
+		fn.State.Clear(sp)
+		fn.MaxTimestampInBag.Clear(sp)
+	}
+}
+
+func AddTimerGarbageCollection[V any](s beam.Scope, in beam.PCollection) beam.PCollection {
+	return beam.ParDo(s, &timerGarbageCollectionFn[V]{
+		State: state.MakeValueState[V]("timerTimestamp"),
+		MaxTimestampInBag: state.MakeCombiningState[int64, int64, int64]("maxTimestampInBag", func(a, b int64) int64 {
+			if a > b {
+				return a
+			}
+			return b
+		}),
+		GcTimer: timers.InEventTime("gcTimer"),
+	}, in)
+}
+
+// [END timer_garbage_collection]
+
+type Event struct{}
+
+func (*Event) isClick() bool {}

Review Comment:
   the precommit build may be failing because we are not returning anything here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org