You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/05/19 01:31:56 UTC

[GitHub] [beam] lostluck opened a new pull request, #26782: [#22737] Fit & Finish for Go SDK timer support.

lostluck opened a new pull request, #26782:
URL: https://github.com/apache/beam/pull/26782

   DO NOT MERGE: Still need unit tests for the ParDo.processTimers method.
   -------
   
   Cleanup and fit & finish work for Go SDK timers. 
   
   * Adds a timers.Context for providing the Family & Tag fields.
   * Adds a ClearTag method to EventTime and ProcessingTime timers to support clearing a timer for a specific tag.
   * Updates the timer_wordcap example
     * Use a finite Periodic sequence to get elements.
     * Demonstrate element batching with State & Timers.
     * Add verbose logging to clarify execution for example purposes.
     * Demonstrate multiple tags set for the same timer.
   * Simplifies user key coderID sourcing by moving it entirely to pipeline serialization time.
     * Current Beam model is more general on timer key and window coders, but they are implicitly associated directly with the DoFn's parallel input key.
   * Fetches coders at execution time from the bundle descriptors returned timer_family_spec.
     * This avoids relying on the previous implementation detail, while also allowing the runner to replace the user key coder as needed, such as to add length prefixes or similar.
   * Handle inserted timers similarly to [Java SDK](https://github.com/apache/beam/blob/f154d0165d31068f9e0d2ae008d7823dcf4cf53a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1667-L1734) allowing for quickly re-looping timers when applicable.
     * Supporting this is the bulk of the PRs complexity, but ordering is required for correctness and avoiding double firing.
   * Ensure keys are not repeatedly encoded/decoded for a given inline process.
     * Key bytes on fired timers are re-used when writing new timers. 
   * Removes unnecessary UserTimerAdapter interface, as all uses are in the exec package.
   * Ensures all timers from a given timer batch are parsed.
   * Since they don't have mutable state, made timers.ProcessingTime and timers.EventTime have value receivers instead of pointer receivers.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213617691


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -158,6 +161,7 @@ func (n *ParDo) ProcessElement(_ context.Context, elm *FullValue, values ...ReSt
 // a ParDo's ProcessElement functionality with their own construction of
 // MainInputs.
 func (n *ParDo) processMainInput(mainIn *MainInput) error {
+	n.TimerTracker.SetCurrentKey(mainIn)

Review Comment:
   I folded that check into the SetCurrentKey call itself instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1574035444

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1553888864

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1574035339

   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1560074131

   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1572702380

   Looks like Jenkins is broken outside for reasons unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1574091125

   Run RAT PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1553888245

   R: @riteshghorse 
   
   I'm aware you have Release things to do, so no rush for this. I still need to add unit tests for the processTimer method loop. But otherwise this should get timers to the finish line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1568649534

   Thank you for getting this through @lostluck 
   I got busy with the release but I'll take a look soon and get this merged this week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #26782:
URL: https://github.com/apache/beam/pull/26782


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1572754964

   ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#26782](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (9e7713a) into [master](https://app.codecov.io/gh/apache/beam/commit/f2400c828c390594d712a398c80aa22a635652fc?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (f2400c8) will **decrease** coverage by `0.59%`.
   > The diff coverage is `76.97%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #26782      +/-   ##
   ==========================================
   - Coverage   71.99%   71.41%   -0.59%     
   ==========================================
     Files         747      766      +19     
     Lines      101306   103575    +2269     
   ==========================================
   + Hits        72936    73965    +1029     
   - Misses      26911    28062    +1151     
   - Partials     1459     1548      +89     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `53.41% <76.97%> (-0.55%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/graph/edge.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL2dyYXBoL2VkZ2UuZ28=) | `3.35% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/datasource.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhc291cmNlLmdv) | `66.79% <0.00%> (-2.15%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/translate.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy90cmFuc2xhdGUuZ28=) | `23.24% <0.00%> (-0.50%)` | :arrow_down: |
   | [sdks/go/pkg/beam/pardo.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9wYXJkby5nbw==) | `41.79% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/graphx/translate.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZ3JhcGh4L3RyYW5zbGF0ZS5nbw==) | `37.07% <10.52%> (-0.93%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/coder.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9jb2Rlci5nbw==) | `58.52% <50.00%> (+0.49%)` | :arrow_up: |
   | [sdks/go/pkg/beam/core/runtime/exec/pardo.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9wYXJkby5nbw==) | `64.08% <79.45%> (+4.48%)` | :arrow_up: |
   | [sdks/go/pkg/beam/core/runtime/exec/timers.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy90aW1lcnMuZ28=) | `93.20% <92.90%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/fn.go](https://app.codecov.io/gh/apache/beam/pull/26782?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mbi5nbw==) | `71.63% <100.00%> (+0.86%)` | :arrow_up: |
   
   ... and [69 files with indirect coverage changes](https://app.codecov.io/gh/apache/beam/pull/26782/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213300505


##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -139,12 +154,17 @@ func (pt ProcessingTime) Clear(p Provider) {
 	p.Set(TimerMap{Family: pt.Family, Clear: true})
 }
 
+// Clear clears this timer for the given tag.

Review Comment:
   ```suggestion
   // ClearTag clears this timer for the given tag.
   ```



##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -105,10 +115,15 @@ func (et *EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOpt
 }
 
 // Clear clears this timer.
-func (et *EventTime) Clear(p Provider) {
+func (et EventTime) Clear(p Provider) {
 	p.Set(TimerMap{Family: et.Family, Clear: true})
 }
 
+// Clear clears this timer for the given tag.

Review Comment:
   ```suggestion
   // ClearTag clears this timer for the given tag.
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
 	return val, nil
 }
 
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, bcr *byteCountReader) (*FullValue, error) {
-	timerAdapter, ok := n.Timer.(*userTimerAdapter)
-	if !ok {
-		return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle, handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, error) {
+	var bundleTimers []TimerRecv
+	for {
+		tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+		if err != nil {
+			if goerrors.Is(err, io.EOF) {
+				break
+			}
+			return nil, errors.WithContext(err, "error decoding received timer callback")
+		}
+		bundleTimers = append(bundleTimers, tmap)
 	}
-	tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+	return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They also
+// have a fireing time, and a data watermark hold time. The SDK doesn't determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process, the user
+// code may set additional firings for one or more timers, which may overwrite orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+	// Lookup actual domain for family here.
+	spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+	bundleTimers, err := decodeBundleTimers(spec, r)
 	if err != nil {
-		return nil, errors.WithContext(err, "error decoding received timer callback")
+		return err
 	}
+	for _, tmap := range bundleTimers {
+		n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+		for i, w := range tmap.Windows {
+			ws := tmap.Windows[i : i+1]

Review Comment:
   is it okay to use `w` here instead of `tmap.Windows[i : i+1]`



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -249,6 +253,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
 	if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
 		return n.fail(err)
 	}
+	// Flush timers if any.
+	if err := n.TimerTracker.FlushAndReset(n.ctx, n.timerManager); err != nil {

Review Comment:
   Similar comment as above if we need to check for nil



##########
sdks/go/pkg/beam/core/runtime/exec/timers.go:
##########
@@ -16,112 +16,314 @@
 package exec
 
 import (
+	"bytes"
+	"container/heap"
 	"context"
-	"fmt"
 	"io"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
 
-// UserTimerAdapter provides a timer provider to be used for manipulating timers.
-type UserTimerAdapter interface {
-	NewTimerProvider(ctx context.Context, manager DataManager, inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput) (timerProvider, error)
+type userTimerAdapter struct {
+	sID           StreamID
+	familyToSpec  map[string]timerFamilySpec
+	modifications map[windowKeyPair]*timerModifications
+
+	currentKey       any
+	keyEncoded       bool
+	buf              bytes.Buffer
+	currentKeyString string
 }
 
-type userTimerAdapter struct {
-	sID StreamID
-	ec  ElementEncoder
-	dc  ElementDecoder
-	wc  WindowDecoder
+type timerFamilySpec struct {
+	Domain     timers.TimeDomain
+	KeyEncoder ElementEncoder
+	KeyDecoder ElementDecoder
+	WinEncoder WindowEncoder
+	WinDecoder WindowDecoder
 }
 
-// NewUserTimerAdapter returns a user timer adapter for the given StreamID and timer coder.
-func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoder *coder.Coder) UserTimerAdapter {
-	if !coder.IsW(c) {
-		panic(fmt.Sprintf("expected WV coder for user timer %v: %v", sID, c))
+func newTimerFamilySpec(domain timers.TimeDomain, timerCoder *coder.Coder) timerFamilySpec {
+	keyCoder := timerCoder.Components[0]
+	return timerFamilySpec{
+		Domain:     domain,
+		KeyEncoder: MakeElementEncoder(keyCoder),
+		KeyDecoder: MakeElementDecoder(keyCoder),
+		WinEncoder: MakeWindowEncoder(timerCoder.Window),
+		WinDecoder: MakeWindowDecoder(timerCoder.Window),
 	}
-	ec := MakeElementEncoder(timerCoder)
-	dc := MakeElementDecoder(coder.SkipW(c).Components[0])
-	wc := MakeWindowDecoder(c.Window)
-	return &userTimerAdapter{sID: sID, ec: ec, wc: wc, dc: dc}
 }
 
-// NewTimerProvider creates and returns a timer provider to set/clear timers.
-func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager DataManager, inputTs typex.EventTime, w []typex.Window, element *MainInput) (timerProvider, error) {
-	userKey := &FullValue{Elm: element.Key.Elm}
-	tp := timerProvider{
-		ctx:                 ctx,
-		tm:                  manager,
-		userKey:             userKey,
-		inputTimestamp:      inputTs,
-		sID:                 u.sID,
-		window:              w,
-		writersByFamily:     make(map[string]io.Writer),
-		timerElementEncoder: u.ec,
-		keyElementDecoder:   u.dc,
+// newUserTimerAdapter returns a user timer adapter for the given StreamID and timer coder.
+func newUserTimerAdapter(sID StreamID, familyToSpec map[string]timerFamilySpec) *userTimerAdapter {
+	return &userTimerAdapter{sID: sID, familyToSpec: familyToSpec}
+}
+
+// SetCurrentKey keeps the key around so we can encoded if needed for timers.
+func (u *userTimerAdapter) SetCurrentKey(mainIn *MainInput) {
+	if u == nil {
+		return
 	}
+	u.currentKey = mainIn.Key.Elm
+	u.keyEncoded = false
+}
 
-	return tp, nil
+// SetCurrentKeyString is for processing timer callbacks, and avoids re-encoding the key.

Review Comment:
   NIce optimization here



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -158,6 +161,7 @@ func (n *ParDo) ProcessElement(_ context.Context, elm *FullValue, values ...ReSt
 // a ParDo's ProcessElement functionality with their own construction of
 // MainInputs.
 func (n *ParDo) processMainInput(mainIn *MainInput) error {
+	n.TimerTracker.SetCurrentKey(mainIn)

Review Comment:
   should we check `n.TimerTracker` for nil here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1574097985

   The RAT failures are spurious and the Go precommits are now passing on jenkins. Merging!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213620890


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
 	return val, nil
 }
 
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, bcr *byteCountReader) (*FullValue, error) {
-	timerAdapter, ok := n.Timer.(*userTimerAdapter)
-	if !ok {
-		return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle, handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, error) {
+	var bundleTimers []TimerRecv
+	for {
+		tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+		if err != nil {
+			if goerrors.Is(err, io.EOF) {
+				break
+			}
+			return nil, errors.WithContext(err, "error decoding received timer callback")
+		}
+		bundleTimers = append(bundleTimers, tmap)
 	}
-	tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+	return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They also
+// have a fireing time, and a data watermark hold time. The SDK doesn't determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process, the user
+// code may set additional firings for one or more timers, which may overwrite orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+	// Lookup actual domain for family here.
+	spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+	bundleTimers, err := decodeBundleTimers(spec, r)
 	if err != nil {
-		return nil, errors.WithContext(err, "error decoding received timer callback")
+		return err
 	}
+	for _, tmap := range bundleTimers {
+		n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+		for i, w := range tmap.Windows {
+			ws := tmap.Windows[i : i+1]

Review Comment:
   Alas, it is not. w is an individual `typex.Window`, but we need a `[]typex.Window` instead for other reasons. So we re-use the backing array behind the slice decoded from the timer, and avoid an allocation.
   
   Since the windows are not writeable (they're copied by value anyways), this is safe for the downstream calls.
   
   There's probably some further cleanup we could do for the various window passing we do in this file (among other cleanups for the file), but this does the correct thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213625552


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -249,6 +253,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
 	if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
 		return n.fail(err)
 	}
+	// Flush timers if any.
+	if err := n.TimerTracker.FlushAndReset(n.ctx, n.timerManager); err != nil {

Review Comment:
   Same thing, it's checked inside FlushAndReset.
   
   The reason we can get away with this is that in Go, the receiver is just another parameter. So it's valid to call methods on nil. In this case, that allows folding the nil check into the method itself instead of outside.
   
   The main reason not to do this for performance optimization: That is, you never check nil anywhere because it's always correct by construction. We can't do that here, so we'd need to check every time it's called anyway, so I folded it into the method definition to try to keep pardo.go a little tidier. It's complicated enough already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213735552


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
 	return val, nil
 }
 
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, bcr *byteCountReader) (*FullValue, error) {
-	timerAdapter, ok := n.Timer.(*userTimerAdapter)
-	if !ok {
-		return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle, handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, error) {
+	var bundleTimers []TimerRecv
+	for {
+		tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+		if err != nil {
+			if goerrors.Is(err, io.EOF) {
+				break
+			}
+			return nil, errors.WithContext(err, "error decoding received timer callback")
+		}
+		bundleTimers = append(bundleTimers, tmap)
 	}
-	tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+	return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They also
+// have a fireing time, and a data watermark hold time. The SDK doesn't determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process, the user
+// code may set additional firings for one or more timers, which may overwrite orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+	// Lookup actual domain for family here.
+	spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+	bundleTimers, err := decodeBundleTimers(spec, r)
 	if err != nil {
-		return nil, errors.WithContext(err, "error decoding received timer callback")
+		return err
 	}
+	for _, tmap := range bundleTimers {
+		n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+		for i, w := range tmap.Windows {
+			ws := tmap.Windows[i : i+1]

Review Comment:
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on a diff in pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213735674


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -249,6 +253,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
 	if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
 		return n.fail(err)
 	}
+	// Flush timers if any.
+	if err := n.TimerTracker.FlushAndReset(n.ctx, n.timerManager); err != nil {

Review Comment:
   okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1568732366

   Release definitely comes first. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1560075077

   @riteshghorse this should now be ready for a look, tested and all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1572869244

   Run RAT PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #26782: [#22737] Fit & Finish for Go SDK timer support.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #26782:
URL: https://github.com/apache/beam/pull/26782#issuecomment-1574030536

   Run RAT PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org