You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/10 01:05:48 UTC

[GitHub] [beam] youngoli opened a new pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

youngoli opened a new pull request #13070:
URL: https://github.com/apache/beam/pull/13070


   Adjust Go SDF splitting and execution code so that splits take exploded windows into consideration and perform splits at window boundaries. Also required adjusting various split logic to accept multiple residuals and primaries from splits.
   
   The changes to the splitting logic itself are in exec/sdf.go, and the tests are in the corresponding test file. The rest of the changes are mostly related to adjusting other code to handle the change from single primaries and restrictions to multiple primaries and restrictions, or just small bugfixes (like offsetrange.RTracker.GetProgress).
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #13070:
URL: https://github.com/apache/beam/pull/13070#issuecomment-707381921


   Run Go Postcommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #13070:
URL: https://github.com/apache/beam/pull/13070


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #13070:
URL: https://github.com/apache/beam/pull/13070#issuecomment-706460327


   R: @lostluck 
   R: @robertwb (For the split logic mainly, you can find that in exec/sdf.go)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503530205



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -354,12 +354,12 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
 		return SplitResult{PI: s - 1, RI: s}, nil
 	}
 	// Otherwise, perform a sub-element split.
-	p, r, err := su.Split(fr)
+	ps, rs, err := su.Split(fr)
 	if err != nil {
 		return SplitResult{}, err
 	}
 
-	if p == nil || r == nil { // Unsuccessful split.
+	if len(ps) == 0 || len(rs) == 0 { // Unsuccessful split.

Review comment:
       Just because once I moved to slices, having an empty slice seemed to make more sense to indicate "the function worked, but the result was empty", rather than nil slices which is what gets returned for errors. I could switch it back to nil, since that was mostly a stylistic choice.
   
   Checking over the code, yeah it looks like if one is empty I always just return both empty. But I think I used an "or" here just to avoid relying on implementation details of the split. Basically, if even one of them is missing then then Datasource knows the split failed, and doesn't have to really know what exactly happened.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503570217



##########
File path: sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
##########
@@ -103,11 +103,11 @@ func TestDynamicSplit(t *testing.T) {
 			// with the input coder to the path.
 			// TODO(BEAM-10579) Switch to using splittable unit's input coder
 			// once that is implemented.
-			p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
+			p, err := decodeDynSplitElm(splitRes.split.PS[0], cdr)

Review comment:
       Go, being a simple language would have you put the if in there. It would be a waste to wrap it in a function unless the pattern is repeating. At least not until generics land and we can define a `GetOnlyElement[T any](slice []T) { ... }` helper.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r502727024



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -338,12 +348,17 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		// If we need to process the element in multiple windows, each one needs
 		// its own RTracker and progress must be tracked among all windows by
 		// currW updated between processing.
-		for _, w := range elm.Windows {
+		n.numW = len(elm.Windows)
+
+		//for _, w := range elm.Windows {

Review comment:
       RM commented out line, OR swap the line below it with one that doesn't _ ignore the index. As far as I can read it, 
   
   `for i, w := range elm.Windows { ` should be identical and let you remove `w := elm.Windows[i]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #13070:
URL: https://github.com/apache/beam/pull/13070#issuecomment-706461203


   Run Go PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503530579



##########
File path: sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
##########
@@ -103,11 +103,11 @@ func TestDynamicSplit(t *testing.T) {
 			// with the input coder to the path.
 			// TODO(BEAM-10579) Switch to using splittable unit's input coder
 			// once that is implemented.
-			p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
+			p, err := decodeDynSplitElm(splitRes.split.PS[0], cdr)

Review comment:
       That would be useful. @lostluck , know of anything? If not I can just explicitly check that it only has a length of one, because that is an expectation of the test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503569540



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -354,12 +354,12 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
 		return SplitResult{PI: s - 1, RI: s}, nil
 	}
 	// Otherwise, perform a sub-element split.
-	p, r, err := su.Split(fr)
+	ps, rs, err := su.Split(fr)
 	if err != nil {
 		return SplitResult{}, err
 	}
 
-	if p == nil || r == nil { // Unsuccessful split.
+	if len(ps) == 0 || len(rs) == 0 { // Unsuccessful split.

Review comment:
       Idiomatically, if there's no intended semantic real difference between nil and empty, then checking the length is the correct way to handle that. nil slices are also 0 length. 
   This catches the error case that empty slices are generated, and hints to the compiler that there's at least 1 entry in the slices after this if block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503484424



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -354,12 +354,12 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
 		return SplitResult{PI: s - 1, RI: s}, nil
 	}
 	// Otherwise, perform a sub-element split.
-	p, r, err := su.Split(fr)
+	ps, rs, err := su.Split(fr)
 	if err != nil {
 		return SplitResult{}, err
 	}
 
-	if p == nil || r == nil { // Unsuccessful split.
+	if len(ps) == 0 || len(rs) == 0 { // Unsuccessful split.

Review comment:
       Nit: nil seemed more explicit, why was this changed? (On that note, if one is empty must the other be empty as well, or is it OK to have one non-empty and treat that as an unsuccessful split?)

##########
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
##########
@@ -180,8 +180,8 @@ func (tracker *Tracker) TrySplit(fraction float64) (primary, residual interface{
 
 // GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
 func (tracker *Tracker) GetProgress() (done, remaining float64) {
-	done = float64(tracker.claimed - tracker.rest.Start)
-	remaining = float64(tracker.rest.End - tracker.claimed)
+	done = float64((tracker.claimed + 1) - tracker.rest.Start)

Review comment:
       Ws his an existing bug? 

##########
File path: sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
##########
@@ -103,11 +103,11 @@ func TestDynamicSplit(t *testing.T) {
 			// with the input coder to the path.
 			// TODO(BEAM-10579) Switch to using splittable unit's input coder
 			// once that is implemented.
-			p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
+			p, err := decodeDynSplitElm(splitRes.split.PS[0], cdr)

Review comment:
       Is there a convention here you could use to assert that there's only one element while getting it, rather than let any (unexpected?) other elements in the list be dropped? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503530815



##########
File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
##########
@@ -180,8 +180,8 @@ func (tracker *Tracker) TrySplit(fraction float64) (primary, residual interface{
 
 // GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
 func (tracker *Tracker) GetProgress() (done, remaining float64) {
-	done = float64(tracker.claimed - tracker.rest.Start)
-	remaining = float64(tracker.rest.End - tracker.claimed)
+	done = float64((tracker.claimed + 1) - tracker.rest.Start)

Review comment:
       Yeah, it's something I missed because I was only testing this code with large ranges, and it's most noticeable with small ranges.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13070: [BEAM-11020] Adding multi-window splitting to Go SDF.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13070:
URL: https://github.com/apache/beam/pull/13070#discussion_r503526693



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -338,12 +348,17 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		// If we need to process the element in multiple windows, each one needs
 		// its own RTracker and progress must be tracked among all windows by
 		// currW updated between processing.
-		for _, w := range elm.Windows {
+		n.numW = len(elm.Windows)
+
+		//for _, w := range elm.Windows {

Review comment:
       Removed the commented out line. It's actually not identical because a dynamic split can modify n.numW if some of the windows are split with the residual, which is why I loop using numW instead. I could theoretically loop based on `len(elm.Windows)` instead, avoiding the `range` keyword, and directly trim the windows slice in the current element, but I don't know if there's any advantage to doing it that way and this seems simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org