You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "jrmccluskey (via GitHub)" <gi...@apache.org> on 2023/07/31 14:20:39 UTC

[GitHub] [beam] jrmccluskey commented on a diff in pull request #27737: [prism] Fusion base, reshuffle, cogbk.

jrmccluskey commented on code in PR #27737:
URL: https://github.com/apache/beam/pull/27737#discussion_r1279335861


##########
sdks/go/pkg/beam/runners/prism/internal/handlerunner.go:
##########
@@ -63,13 +64,72 @@ func (*runner) ConfigCharacteristic() reflect.Type {
 	return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
 }
 
+var _ transformPreparer = (*runner)(nil)
+
+func (*runner) PrepareUrns() []string {
+	return []string{urns.TransformReshuffle}
+}
+
+// PrepareTransform handles special processing with respect runner transforms, like reshuffle.
+func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+	// TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle.
+	// TODO: Implement a fusion break for reshuffles.
+
+	if h.config.SDKReshuffle {
+		panic("SDK side reshuffle not yet supported")
+	}
+
+	// A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness.
+	// It could however affect performance, so it exists to tell the runner that this
+	// point in the pipeline needs a fusion break, to enable the pipeline to change it's
+	// degree of parallelism.
+	//
+	// The change of parallelism goes both ways. It could allow for larger batch sizes
+	// enable smaller batch sizes downstream if it is infact paralleizable.
+	//
+	// But for a single transform node per stage runner, we can elide it entirely,
+	// since the input collection and output collection types match.
+
+	// Get the input and output PCollections, there should only be 1 each.
+	if len(t.GetOutputs()) != 1 {
+		panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
+	}
+	if len(t.GetOutputs()) != 1 {
+		panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
+	}

Review Comment:
   Looks like you missed the input check here and didn't update the panic string for each



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org