You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/22 23:55:36 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #24307: Add map_windows support to Go SDK

lostluck commented on code in PR #24307:
URL: https://github.com/apache/beam/pull/24307#discussion_r1029897281


##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
 	Iterable           Kind = "I"
 	KV                 Kind = "KV"
 	LP                 Kind = "LP" // Explicitly length prefixed, likely at the runner's direction.
+	IWCValue           Kind = "IWCvalue"

Review Comment:
   Note how nothing else in this `coder` package calls out it's coderness in their short names. I'd just call this `IntervalWindow` with IW as the short version.
   
   The 'value' sufixes for WindowedValue and ParamWindowedValue indicate they're wrapping a value themselves, rather than being windows themselves, like the one we're adding here.



##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
 	Iterable           Kind = "I"
 	KV                 Kind = "KV"
 	LP                 Kind = "LP" // Explicitly length prefixed, likely at the runner's direction.
+	IWCValue           Kind = "IWCvalue"
 
 	Window Kind = "window" // A debug wrapper around a window coder.

Review Comment:
   It bother's me we can't re-use this, but I agree we shouldn't mix these up just now. It's an internal implementation detail at least, so it can be cleaned up at some future point.



##########
sdks/go/pkg/beam/core/runtime/graphx/coder.go:
##########
@@ -245,6 +245,10 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
 				t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...)
 				return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
 			}
+		case urnIntervalWindow:
+			// If interval window in a KV, this may be a mapping function.
+			// Special case since windows are not normally used directly as FullValues.
+			return coder.NewIntervalWindowCoder(), nil

Review Comment:
   Per the proto "map_windows" spec, the input/output is "KV<[]byte,Window>" but as written here, this just returns "window".  It'd be better if we follow the normal structure, generalizing to handle interval windows better instead of special casing here.



##########
sdks/go/pkg/beam/core/runtime/exec/window.go:
##########
@@ -97,6 +98,55 @@ func (w *WindowInto) String() string {
 	return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
 }
 
+type MapWindows struct {
+	UID UnitID
+	Fn  WindowMapper
+	Out Node
+}
+
+func (m *MapWindows) ID() UnitID {
+	return m.UID
+}
+
+func (m *MapWindows) Up(_ context.Context) error {
+	return nil
+}
+
+func (m *MapWindows) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return m.Out.StartBundle(ctx, id, data)
+}
+
+func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	w, ok := elm.Elm.(window.IntervalWindow)
+	if !ok {
+		return errors.Errorf("not an IntervalWindow, got %T", elm.Elm)
+	}
+	newW, err := m.Fn.MapWindow(w)
+	if err != nil {
+		return err
+	}
+	out := &FullValue{
+		Elm:       elm.Elm,

Review Comment:
   As written here, it looks like we assume the output nonce is the same as the input window, which may not be the case in all runners. As the coder is written, and picked for a KV, it looks like the mapped element never gets coded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org