You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/29 14:04:10 UTC

[GitHub] [beam] riteshghorse commented on a diff in pull request #24346: [#24339] Make Slices use iterable coder instead of custom coder.

riteshghorse commented on code in PR #24346:
URL: https://github.com/apache/beam/pull/24346#discussion_r1034747217


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error {
 }
 
 func (n *DataSource) String() string {
-	return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name, n.Coder, n.Out.ID())
+	return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, n.Name, n.Coder, n.Out.ID())

Review Comment:
   ```suggestion
   	return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, n.Name, n.Out.ID(), n.Coder)
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/translate.go:
##########
@@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
 			b.units = b.units[:len(b.units)-1]
 		}
 
+		mayFixDataSourceCoder(u)
 		b.units = append(b.units, u)
 	}
 	return b.build()
 }
 
+// mayFixDataSourceCoder checks the node downstream of the DataSource and if applicable, changes
+// a KV<k, Iter<V>> coder to a CoGBK<k, v>. This requires knowledge of the downstream node because
+// coder interpretation is ambiguous to received types in DoFns, and we can only interpret it right
+// at execution time with knowledge of both.
+func mayFixDataSourceCoder(u *DataSource) {
+	if !coder.IsKV(coder.SkipW(u.Coder)) {
+		return // If it's not a KV, there's nothing to do here.
+	}
+	if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable {
+		return // If the V is not an iterable, we don't care.
+	}
+	out := u.Out
+	if mp, ok := out.(*Multiplex); ok {
+		// Here we trust that the Multiplex Outs are all the same signature, since we've validated
+		// that at construction time.
+		out = mp.Out[0]
+	}
+
+	// Expand, *Combine, MergeAccumulators, ReshuffleOutput nodes always have CoGBK behavior.

Review Comment:
   ```suggestion
   	// Expand, Combine, MergeAccumulators, ReshuffleOutput nodes always have CoGBK behavior.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org