You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/18 22:19:30 UTC

[GitHub] [beam] lostluck commented on a change in pull request #15743: [BEAM-11087] Add default WindowMappingFn from Main to Side Input windows, validation test, unit tests

lostluck commented on a change in pull request #15743:
URL: https://github.com/apache/beam/pull/15743#discussion_r731335377



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())

Review comment:
       fmt is aware of the Stringer interface, so you don't need to explicitly call a `String()` method as input into a fmt call.
   ```suggestion
   			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode)
   ```

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())

Review comment:
       The property that prevents session windows from working properly is that it's a merging windowfn. We can't currently check this bit directly right now, but we can begin to educate users about it.
   
   "error with SideInput %d: PCollections using Merging WindowFns are not supported as side inputs"
   
   Or something similar. So that adds that it's the `i`th side input parameter that was the problem, but we can do better.
   
   We should also add further context: 
   Which user DoFn was the user adding to the graph, that has the bad side input? 
   
   We can also suggest that users could re-window PCollection to a non-merging WindowFn before using it as a side input. As a rule, users love it when the error that impedes them suggests a solution to the problem.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())
+		}
+		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
+			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n.String(), sideNode.String())

Review comment:
       ```suggestion
   			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n, sideNode)
   ```

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.

Review comment:
       We likely should get rid of the commentary here, since everything now works. ;)

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -93,6 +94,47 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
+// ValidateWindowedSideInputs checks that side inputs have accurate windowing information when used.
+func ValidateWindowedSideInputs(s beam.Scope) {
+	timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+	timestampedSide := beam.ParDo(s, &createTimestampedData{Data: []int{1, 2, 3}}, beam.Impulse(s))
+
+	timestampedData = beam.DropKey(s, timestampedData)
+	timestampedSide = beam.DropKey(s, timestampedSide)
+
+	_ = timestampedSide
+
+	windowSize := 1 * time.Second
+
+	validateSums := func(s beam.Scope, wfn, sideFn *window.Fn, in, side beam.PCollection, expected ...interface{}) {
+		wData := beam.WindowInto(s, wfn, in)
+		wSide := beam.WindowInto(s, sideFn, side)
+
+		sums := beam.ParDo(s, sumSideInputs, wData, beam.SideInput{Input: wSide})
+
+		sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+
+		passert.Equals(s, sums, expected...)
+	}
+
+	// This works.
+	validateSums(s.Scope("Fixed-Global"), window.NewFixedWindows(windowSize), window.NewGlobalWindows(), timestampedData, timestampedData, 7, 8, 9)
+	// So does this.
+	validateSums(s.Scope("Fixed-Same"), window.NewFixedWindows(windowSize), window.NewFixedWindows(windowSize), timestampedData, timestampedData, 2, 4, 6)
+
+	// Thise doesn't
+	validateSums(s.Scope("Fixed-Big"), window.NewFixedWindows(windowSize), window.NewFixedWindows(10*time.Second), timestampedData, timestampedSide, 7, 8, 9)

Review comment:
       Please also add Sliding-Fixed and a Fixed-Sliding case, so we can be sure the code gets exercised fully.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -49,7 +50,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 	}
 
 	in := []*graph.Node{col.n}
+	inWfn := col.n.WindowingStrategy().Fn
 	for _, s := range side {
+		sideNode := s.Input.n
+		sideWfn := sideNode.WindowingStrategy().Fn
+		if sideWfn.Kind == window.Sessions {
+			return nil, fmt.Errorf("side input %v is session-windowed, which is not supported", sideNode.String())
+		}
+		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
+			return nil, fmt.Errorf("main input %v is global windowed but side input %v is not, cannot map windows correctly", col.n.String(), sideNode.String())

Review comment:
       Same improvements to the error message here too: DoFn, which SideInput Parameter had the problem.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org