You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2024/04/19 17:35:19 UTC

[PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

lostluck opened a new pull request, #31057:
URL: https://github.com/apache/beam/pull/31057

   Light refactor to how Residuals are handled in the element manager, to prepare for deferring their processing via processing time.
   
   Moves timer triage to it's own method as well to prepare for the additional processing time handling code.
   
   PR doesn't make any behavioral changes, just code moves and restructuring with types.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576857084


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
 	return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
+type Residual struct {
+	Element []byte
+	Delay   time.Duration // The relative time delay.
+	Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
+	Data                 []Residual
+	TransformID, InputID string                // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each.
+	MinOutputWatermarks  map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.)

Review Comment:
   There's no such thing as a "per window watermark", and windows themselves are only meaningful at aggregation boundaries. As such, no, we won't add windows to this.
   
   Per the model/protos: The MinOutputWatermarks are set on a per output PCollection basis based on the estimated watermarks provided by the user code. So that string key refers to to the local output identifier.
   
   In practice, the watermark for a single transform is the same for all outputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576847287


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
 	return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
+type Residual struct {
+	Element []byte
+	Delay   time.Duration // The relative time delay.
+	Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {
+	Data                 []Residual
+	TransformID, InputID string                // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each.
+	MinOutputWatermarks  map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.)

Review Comment:
   Does the string key reference the bundle's window? If so could we add this? Also, is the `Min` of its namesake related to that the minimum element timestamp is what we consider as an input or output Watermark? Finally, I struggled with `technically per Residual` and `aggregated`. Is there a nuance here that is important i.e. cases when it is not 1:1 per Residual. What is being aggregated?



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
 	return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
+type Residual struct {
+	Element []byte
+	Delay   time.Duration // The relative time delay.
+	Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {

Review Comment:
   Could we say instead, including the use of a symbol link,: "Residuals stores remaining [Residual]s, their aggregated Watermarks, and references needed to query their associated transforms"
   
   Question triggered for me by "process continuations" that relates to me in Java where the ProcessContinuation is the return type for all ProcessElement lifecycle methods, whether or not the developer uses the void return type:
   Is `Residuals` instantiated even when a DoFn does not explicitly set a process timer? In other words, do the code paths converge to using Residuals, both when remaining elements exist in a bundle and when process timers are explicitly configured for a DoFn?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #31057:
URL: https://github.com/apache/beam/pull/31057#issuecomment-2073438453

   Thanks! Do let me know if you have follow up questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #31057:
URL: https://github.com/apache/beam/pull/31057#discussion_r1576877619


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -532,12 +533,26 @@ func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
 	return ret
 }
 
+// Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
+type Residual struct {
+	Element []byte
+	Delay   time.Duration // The relative time delay.
+	Bounded bool          // Whether this element is finite or not.
+}
+
+// Residuals is used to specify process continuations within a bundle.
+type Residuals struct {

Review Comment:
   Sure. I haven't tried using the go 1.19 godoc improvements yet. But be aware these are internal packages and it's very unlikely that someone will ever look at the godoc for them without simply looking at the code directly.
   
   I'll do this in a subsequent PR since this change is pre-work, and far from its final shape.
   
   -----
   
   Short answer: No. It's not instantiated in any meaningful way if the user code doesn't return a ProcessContinuation. Delay or similar, since SDK doesn't return residuals if the user doesn't explicitly use a ProcessContinuation.
   
   Timers are an orthogonal more primitive concept. One could implement processing continuations in terms of ProcessingTime timers and State. That might be how this ultimately ends up working. I don't know yet.
   
   Residuals is a type, so to talk about instantiation you need to point to a specific instance or not. It's also a small type with 4 fields. If the map and slices remain instantiated themselves, then they are nil. Go struct instantiations are very very cheap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #31057:
URL: https://github.com/apache/beam/pull/31057#issuecomment-2073335870

   Friendly ping!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #31057:
URL: https://github.com/apache/beam/pull/31057#issuecomment-2067056283

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #31057:
URL: https://github.com/apache/beam/pull/31057#issuecomment-2067054890

   R: @damondouglas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [#30083][prism] Refactor Residuals to prepare for processing time scheduling. [beam]

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #31057:
URL: https://github.com/apache/beam/pull/31057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org