You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/05 20:47:21 UTC

[GitHub] [beam] youngoli opened a new pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

youngoli opened a new pull request #13013:
URL: https://github.com/apache/beam/pull/13013


   Now the SDF runner keeps track of the current window being processed as well,
   so when GetProgress is called on the SplittableUnit it can calculate progress
   relative to all the windows.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13013:
URL: https://github.com/apache/beam/pull/13013#discussion_r499939012



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -134,40 +134,40 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// If the function observes windows, we must invoke it for each window. The expected fast path
 	// is that either there is a single window or the function doesn't observes windows.

Review comment:
       Consider amending the comment here making the optimization explicit.
   ...doesn't observe windows meaning they can be computed simultaneously as a single window.
   
   Mostly because processSingleWindow is very explicit at saying it's for 1 window only, which could cause this to read as a bug otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13013:
URL: https://github.com/apache/beam/pull/13013#discussion_r500671989



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -134,40 +134,40 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// If the function observes windows, we must invoke it for each window. The expected fast path
 	// is that either there is a single window or the function doesn't observes windows.

Review comment:
       Done, added some comments making this optimization more explicit.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf_test.go
##########
@@ -400,6 +400,90 @@ func TestAsSplittableUnit(t *testing.T) {
 	if err != nil {
 		t.Fatalf("invalid function: %v", err)
 	}
+	//wdfn, err := graph.NewDoFn(&VetWindowSdf{}, graph.NumMainInputs(graph.MainSingle))

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13013:
URL: https://github.com/apache/beam/pull/13013#discussion_r499941678



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf_test.go
##########
@@ -400,6 +400,90 @@ func TestAsSplittableUnit(t *testing.T) {
 	if err != nil {
 		t.Fatalf("invalid function: %v", err)
 	}
+	//wdfn, err := graph.NewDoFn(&VetWindowSdf{}, graph.NumMainInputs(graph.MainSingle))

Review comment:
       rm commented out code.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf_test.go
##########
@@ -400,6 +400,90 @@ func TestAsSplittableUnit(t *testing.T) {
 	if err != nil {
 		t.Fatalf("invalid function: %v", err)
 	}
+	//wdfn, err := graph.NewDoFn(&VetWindowSdf{}, graph.NumMainInputs(graph.MainSingle))
+	//if err != nil {
+	//	t.Fatalf("invalid function: %v", err)
+	//}
+	multiWindows := []typex.Window{
+		window.IntervalWindow{Start: 10, End: 20},
+		window.IntervalWindow{Start: 11, End: 21},
+		window.IntervalWindow{Start: 12, End: 22},
+		window.IntervalWindow{Start: 13, End: 23},
+	}
+
+	// Test that progress returns expected results and respects windows.
+	t.Run("Progress", func(t *testing.T) {
+		tests := []struct {
+			name          string
+			fn            *graph.DoFn
+			in            FullValue
+			doneWork      float64 // Will be output by RTracker's GetProgress.
+			remainingWork float64 // Will be output by RTracker's GetProgress.
+			currWindow    int
+			wantProgress  float64
+		}{
+			{
+				name: "SingleWindow",
+				fn:   dfn,
+				in: FullValue{
+					Elm: &FullValue{
+						Elm:  1,
+						Elm2: &VetRestriction{ID: "Sdf"},
+					},
+					Elm2:      1.0,
+					Timestamp: testTimestamp,
+					Windows:   testWindows,
+				},
+				doneWork:      1.0,
+				remainingWork: 1.0,
+				currWindow:    0,
+				wantProgress:  0.5,
+			},
+			{
+				name: "MultipleWindows",
+				fn:   dfn,

Review comment:
       VetSdf doesn't observe windows (either explicitly or implicitly via side input), so I'm not sure it checks the code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #13013:
URL: https://github.com/apache/beam/pull/13013


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #13013:
URL: https://github.com/apache/beam/pull/13013#issuecomment-703879837


   R: @lostluck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13013:
URL: https://github.com/apache/beam/pull/13013#discussion_r500674070



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf_test.go
##########
@@ -400,6 +400,90 @@ func TestAsSplittableUnit(t *testing.T) {
 	if err != nil {
 		t.Fatalf("invalid function: %v", err)
 	}
+	//wdfn, err := graph.NewDoFn(&VetWindowSdf{}, graph.NumMainInputs(graph.MainSingle))
+	//if err != nil {
+	//	t.Fatalf("invalid function: %v", err)
+	//}
+	multiWindows := []typex.Window{
+		window.IntervalWindow{Start: 10, End: 20},
+		window.IntervalWindow{Start: 11, End: 21},
+		window.IntervalWindow{Start: 12, End: 22},
+		window.IntervalWindow{Start: 13, End: 23},
+	}
+
+	// Test that progress returns expected results and respects windows.
+	t.Run("Progress", func(t *testing.T) {
+		tests := []struct {
+			name          string
+			fn            *graph.DoFn
+			in            FullValue
+			doneWork      float64 // Will be output by RTracker's GetProgress.
+			remainingWork float64 // Will be output by RTracker's GetProgress.
+			currWindow    int
+			wantProgress  float64
+		}{
+			{
+				name: "SingleWindow",
+				fn:   dfn,
+				in: FullValue{
+					Elm: &FullValue{
+						Elm:  1,
+						Elm2: &VetRestriction{ID: "Sdf"},
+					},
+					Elm2:      1.0,
+					Timestamp: testTimestamp,
+					Windows:   testWindows,
+				},
+				doneWork:      1.0,
+				remainingWork: 1.0,
+				currWindow:    0,
+				wantProgress:  0.5,
+			},
+			{
+				name: "MultipleWindows",
+				fn:   dfn,

Review comment:
       This test doesn't actually check the code in ProcessElement because it never actually calls ProcessElement, so it doesn't actually matter what the DoFn being used is (in fact, I can probably erase that field from the tests). It's unit testing the SplittableUnit behavior specifically.
   
   I think I can add a unit test to make sure ProcessElement handles windows correctly (or a component test technically, since it's testing multiple methods), but it's a little more involved since I need to figure out how to block processing to check progress and perform splits. I'll add it in the upcoming PR enabling multi-window splitting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13013: [BEAM-11019] Fixing Go SDF progress reporting with multiple windows.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13013:
URL: https://github.com/apache/beam/pull/13013#discussion_r499939012



##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -134,40 +134,40 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// If the function observes windows, we must invoke it for each window. The expected fast path
 	// is that either there is a single window or the function doesn't observes windows.

Review comment:
       Consider amending the comment here making the optimization explicit.
   ...doesn't observe windows meaning they can be computed simultaneously.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org