You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/30 00:09:21 UTC

[GitHub] [beam] lostluck commented on a change in pull request #12124: [BEAM-10312] Sub-element progress accounted for during splitting.

lostluck commented on a change in pull request #12124:
URL: https://github.com/apache/beam/pull/12124#discussion_r447317094



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -302,12 +302,23 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64,
 	}
 
 	n.mu.Lock()
+	currProg := 0.0  // Current element progress.

Review comment:
       Extremely minor nit: Since the actual value here doesn't matter, just the type, consider `var currProg float64`  instead to make it clearer that the value is notionally uninitialized.
   
   There's an argument to be made that this way it gets it's type from the rt.GetProgress() call then, which is a bit more flexible, but  that benefits the writer more than the reader.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -319,39 +330,85 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64,
 }
 
 // splitHelper is a helper function that finds a split point in a range.
-// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
-// and represent the start and end of the splittable range respectively. splits
-// is an optional slice of valid split indices, and if nil then all indices are
-// considered valid split points. frac must be between [0, 1], and represents
-// a fraction of the remaining work that the split point aims to be as close
-// as possible to.
-func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+//
+// currIdx and endIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively.
+//
+// currProg represents the progress through the current element (currIdx).
+//
+// splits is an optional slice of valid split indices, and if nil then all
+// indices are considered valid split points.
+//
+// frac must be between [0, 1], and represents a fraction of the remaining work
+// that the split point aims to be as close as possible to.
+//
+// splittable indicates that sub-element splitting is possible (i.e. the next
+// unit is an SDF).
+//
+// Returns the element index to split at (first element of residual), and the
+// fraction within that element to split, iff the split point is the current
+// element, the splittable param is set to true, and both the element being
+// split and the following element are valid split points.
+func splitHelper(
+	currIdx, endIdx int64,
+	currProg float64,
+	splits []int64,
+	frac float64,
+	splittable bool) (int64, float64, error) {
 	// Get split index from fraction. Find the closest index to the fraction of
 	// the remainder.
-	var start int64 = 0
-	if currIdx > start {
-		start = currIdx
-	}
-	// This is the first valid split index, since we should never split at 0 or
-	// at the current element.
-	safeStart := start + 1
-	// The remainder starts at our actual progress (i.e. start), but our final
-	// split index has to be >= our safeStart.
-	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
-	if fracIdx < safeStart {
-		fracIdx = safeStart
+	start := float64(currIdx) + currProg
+	safeStart := currIdx + 1 // safeStart avoids splitting at 0, or <= currIdx
+	if safeStart <= 0 {
+		safeStart = 1
 	}
+	var splitFloat = start + frac*(float64(endIdx)-start)
+
 	if len(splits) == 0 {
-		// All split points are valid so just split at fraction.
-		return fracIdx, nil
+		if splittable && int64(splitFloat) == currIdx {
+			// Sub-element splitting is valid here, so return fraction.
+			_, f := math.Modf(splitFloat)
+			return int64(splitFloat), f, nil
+		} else {
+			// All split points are valid so just split at safe index closest to
+			// fraction.
+			splitIdx := int64(math.Round(splitFloat))
+			if splitIdx < safeStart {
+				splitIdx = safeStart
+			}
+			return splitIdx, 0.0, nil
+		}
 	} else {

Review comment:
       By extension, since unindenting the nested else right above here makes clear that enterng that if clause is terminal, remove and unindent this else clause here, which brings the rest of the function back to indent-1

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##########
@@ -494,11 +501,50 @@ func TestSplitHelper(t *testing.T) {
 		for _, test := range tests {
 			test := test
 			t.Run(fmt.Sprintf("(%v of [%v, %v])", test.frac, test.curr, test.size), func(t *testing.T) {
-				got, err := splitHelper(test.curr, test.size, nil, test.frac)
+				wantFrac := 0.0
+				got, gotFrac, err := splitHelper(test.curr, test.size, 0.0, nil, test.frac, false)
+				if err != nil {
+					t.Errorf("error in splitHelper: %v", err)
+				} else {
+					if got != test.want {
+						t.Errorf("incorrect split point: got: %v, want: %v", got, test.want)
+					}
+					if !floatEquals(gotFrac, wantFrac, eps) {
+						t.Errorf("incorrect split fraction: got: %v, want: %v", gotFrac, wantFrac)
+					}
+				}
+			})
+		}
+	})
+
+	t.Run("WithElementProgress", func(t *testing.T) {
+		tests := []struct {
+			curr, size int64
+			currProg   float64
+			frac       float64
+			want       int64
+		}{
+			// Progress into the active element influences where the split of
+			// the remainder falls.
+			{curr: 0, currProg: 0.5, size: 4, frac: 0.25, want: 1},
+			{curr: 0, currProg: 0.9, size: 4, frac: 0.25, want: 2},
+			{curr: 1, currProg: 0.0, size: 4, frac: 0.25, want: 2},
+			{curr: 1, currProg: 0.1, size: 4, frac: 0.25, want: 2},
+		}
+		for _, test := range tests {
+			test := test
+			t.Run(fmt.Sprintf("(%v of [%v, %v])", test.frac, float64(test.curr)+test.currProg, test.size), func(t *testing.T) {
+				wantFrac := 0.0
+				got, gotFrac, err := splitHelper(test.curr, test.size, test.currProg, nil, test.frac, false)
 				if err != nil {
 					t.Errorf("error in splitHelper: %v", err)
-				} else if got != test.want {
-					t.Errorf("incorrect split point: got: %v, want: %v", got, test.want)
+				} else {

Review comment:
       Same else comment here. In this case, use t.Fatalf in the err != nil case instead of t.Errorf to get the desired behaviour.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -319,39 +330,85 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64,
 }
 
 // splitHelper is a helper function that finds a split point in a range.
-// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
-// and represent the start and end of the splittable range respectively. splits
-// is an optional slice of valid split indices, and if nil then all indices are
-// considered valid split points. frac must be between [0, 1], and represents
-// a fraction of the remaining work that the split point aims to be as close
-// as possible to.
-func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+//
+// currIdx and endIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively.
+//
+// currProg represents the progress through the current element (currIdx).
+//
+// splits is an optional slice of valid split indices, and if nil then all
+// indices are considered valid split points.
+//
+// frac must be between [0, 1], and represents a fraction of the remaining work
+// that the split point aims to be as close as possible to.
+//
+// splittable indicates that sub-element splitting is possible (i.e. the next
+// unit is an SDF).
+//
+// Returns the element index to split at (first element of residual), and the
+// fraction within that element to split, iff the split point is the current
+// element, the splittable param is set to true, and both the element being
+// split and the following element are valid split points.
+func splitHelper(
+	currIdx, endIdx int64,
+	currProg float64,
+	splits []int64,
+	frac float64,
+	splittable bool) (int64, float64, error) {
 	// Get split index from fraction. Find the closest index to the fraction of
 	// the remainder.
-	var start int64 = 0
-	if currIdx > start {
-		start = currIdx
-	}
-	// This is the first valid split index, since we should never split at 0 or
-	// at the current element.
-	safeStart := start + 1
-	// The remainder starts at our actual progress (i.e. start), but our final
-	// split index has to be >= our safeStart.
-	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
-	if fracIdx < safeStart {
-		fracIdx = safeStart
+	start := float64(currIdx) + currProg
+	safeStart := currIdx + 1 // safeStart avoids splitting at 0, or <= currIdx
+	if safeStart <= 0 {
+		safeStart = 1
 	}
+	var splitFloat = start + frac*(float64(endIdx)-start)
+
 	if len(splits) == 0 {
-		// All split points are valid so just split at fraction.
-		return fracIdx, nil
+		if splittable && int64(splitFloat) == currIdx {
+			// Sub-element splitting is valid here, so return fraction.
+			_, f := math.Modf(splitFloat)
+			return int64(splitFloat), f, nil
+		} else {
+			// All split points are valid so just split at safe index closest to
+			// fraction.
+			splitIdx := int64(math.Round(splitFloat))
+			if splitIdx < safeStart {
+				splitIdx = safeStart
+			}
+			return splitIdx, 0.0, nil
+		}
 	} else {
-		// Find the closest unprocessed split point to our fraction.
 		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
-		var prevDiff int64 = math.MaxInt64
+
+		if splittable && int64(splitFloat) == currIdx {
+			// Check valid split points to see if we can do a sub-element split.
+			// We need to find the currIdx and currIdx + 1 for it to be valid.
+			c, cp1 := false, false
+			for _, s := range splits {
+				if s == currIdx {
+					c = true
+				} else if s == currIdx+1 {
+					cp1 = true
+					break
+				} else if s > currIdx+1 {
+					break
+				}
+			}
+			if c && cp1 {
+				// Sub-element splitting is valid here, so return fraction.
+				_, f := math.Modf(splitFloat)
+				return int64(splitFloat), f, nil
+			}
+		}
+
+		//For non-sub-element splitting, find the closest unprocessed split

Review comment:
       ```suggestion
   		// For non-sub-element splitting, find the closest unprocessed split
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##########
@@ -528,24 +574,109 @@ func TestSplitHelper(t *testing.T) {
 		for _, test := range tests {
 			test := test
 			t.Run(fmt.Sprintf("(%v of [%v, %v], splits = %v)", test.frac, test.curr, test.size, test.splits), func(t *testing.T) {
-				got, err := splitHelper(test.curr, test.size, test.splits, test.frac)
+				wantFrac := 0.0
+				got, gotFrac, err := splitHelper(test.curr, test.size, 0.0, test.splits, test.frac, false)
 				if test.err {
 					if err == nil {
 						t.Errorf("splitHelper should have errored, instead got: %v", got)
 					}
+					return
+				}
+				if err != nil {
+					t.Errorf("error in splitHelper: %v", err)
+				} else {

Review comment:
       Same comment here, since t.Errorf can become t.Fatalf.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -319,39 +330,85 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64,
 }
 
 // splitHelper is a helper function that finds a split point in a range.
-// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
-// and represent the start and end of the splittable range respectively. splits
-// is an optional slice of valid split indices, and if nil then all indices are
-// considered valid split points. frac must be between [0, 1], and represents
-// a fraction of the remaining work that the split point aims to be as close
-// as possible to.
-func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+//
+// currIdx and endIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively.
+//
+// currProg represents the progress through the current element (currIdx).
+//
+// splits is an optional slice of valid split indices, and if nil then all
+// indices are considered valid split points.
+//
+// frac must be between [0, 1], and represents a fraction of the remaining work
+// that the split point aims to be as close as possible to.
+//
+// splittable indicates that sub-element splitting is possible (i.e. the next
+// unit is an SDF).
+//
+// Returns the element index to split at (first element of residual), and the
+// fraction within that element to split, iff the split point is the current
+// element, the splittable param is set to true, and both the element being
+// split and the following element are valid split points.
+func splitHelper(
+	currIdx, endIdx int64,
+	currProg float64,
+	splits []int64,
+	frac float64,
+	splittable bool) (int64, float64, error) {
 	// Get split index from fraction. Find the closest index to the fraction of
 	// the remainder.
-	var start int64 = 0
-	if currIdx > start {
-		start = currIdx
-	}
-	// This is the first valid split index, since we should never split at 0 or
-	// at the current element.
-	safeStart := start + 1
-	// The remainder starts at our actual progress (i.e. start), but our final
-	// split index has to be >= our safeStart.
-	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
-	if fracIdx < safeStart {
-		fracIdx = safeStart
+	start := float64(currIdx) + currProg
+	safeStart := currIdx + 1 // safeStart avoids splitting at 0, or <= currIdx
+	if safeStart <= 0 {
+		safeStart = 1
 	}
+	var splitFloat = start + frac*(float64(endIdx)-start)
+
 	if len(splits) == 0 {
-		// All split points are valid so just split at fraction.
-		return fracIdx, nil
+		if splittable && int64(splitFloat) == currIdx {
+			// Sub-element splitting is valid here, so return fraction.
+			_, f := math.Modf(splitFloat)
+			return int64(splitFloat), f, nil
+		} else {
+			// All split points are valid so just split at safe index closest to
+			// fraction.
+			splitIdx := int64(math.Round(splitFloat))
+			if splitIdx < safeStart {
+				splitIdx = safeStart
+			}
+			return splitIdx, 0.0, nil
+		}

Review comment:
       idomatic nit: since the previous if returns, we don't need the else clause, remove the else, and unindent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org