You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/19 16:50:14 UTC
[beam] branch master updated: [Go SDK] Add timer coder support (#23222)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 118454dded3 [Go SDK] Add timer coder support (#23222)
118454dded3 is described below
commit 118454dded38e8859384a21b6153f38c8cc06ec3
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Mon Sep 19 12:50:06 2022 -0400
[Go SDK] Add timer coder support (#23222)
---
sdks/go/pkg/beam/core/graph/coder/coder.go | 12 +--
sdks/go/pkg/beam/core/runtime/exec/coder.go | 101 ++++++++++++++++++++-
sdks/go/pkg/beam/core/runtime/exec/coder_test.go | 20 ----
sdks/go/pkg/beam/core/runtime/exec/timers_test.go | 96 ++++++++++++++++++++
sdks/go/pkg/beam/core/runtime/graphx/coder.go | 14 ++-
sdks/go/pkg/beam/core/typex/class.go | 2 +-
sdks/go/pkg/beam/core/typex/fulltype.go | 2 +
sdks/go/pkg/beam/core/typex/special.go | 20 ++++
.../go/test/regression/coders/fromyaml/fromyaml.go | 83 ++++++++++++++++-
9 files changed, 314 insertions(+), 36 deletions(-)
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index a368be7043f..3ee83502f34 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -347,17 +347,9 @@ func NewT(c *Coder, w *WindowCoder) *Coder {
panic("window must not be nil")
}
- // TODO(https://github.com/apache/beam/issues/20510): Implement proper timer support.
return &Coder{
- Kind: Timer,
- T: typex.New(reflect.TypeOf((*struct {
- Key []byte // elm type.
- Tag string
- Windows []byte // []typex.Window
- Clear bool
- FireTimestamp, HoldTimestamp int64
- Span int
- })(nil)).Elem()),
+ Kind: Timer,
+ T: typex.New(typex.TimersType),
Window: w,
Components: []*Coder{c},
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 75cb1cca231..0421b253bbe 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -145,6 +145,7 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
case coder.Timer:
return &timerEncoder{
elm: MakeElementEncoder(c.Components[0]),
+ win: MakeWindowEncoder(c.Window),
}
case coder.Row:
@@ -262,6 +263,7 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
case coder.Timer:
return &timerDecoder{
elm: MakeElementDecoder(c.Components[0]),
+ win: MakeWindowDecoder(c.Window),
}
case coder.Row:
@@ -890,18 +892,25 @@ func (d *paramWindowedValueDecoder) Decode(r io.Reader) (*FullValue, error) {
type timerEncoder struct {
elm ElementEncoder
+ win WindowEncoder
}
func (e *timerEncoder) Encode(val *FullValue, w io.Writer) error {
- return e.elm.Encode(val, w)
+ return encodeTimer(e.elm, e.win, val.Elm.(typex.TimerMap), w)
}
type timerDecoder struct {
elm ElementDecoder
+ win WindowDecoder
}
func (d *timerDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
- return d.elm.DecodeTo(r, fv)
+ data, err := decodeTimer(d.elm, d.win, r)
+ if err != nil {
+ return err
+ }
+ fv.Elm = data
+ return nil
}
func (d *timerDecoder) Decode(r io.Reader) (*FullValue, error) {
@@ -1208,3 +1217,91 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window,
return ws, t, pn, nil
}
+
+// encodeTimer encodes a typex.TimerMap into a byte stream.
+func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io.Writer) error {
+ var b bytes.Buffer
+
+ elm.Encode(&FullValue{Elm: tm.Key}, &b)
+
+ if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil {
+ return errors.WithContext(err, "error encoding tag")
+ }
+
+ if err := win.Encode(tm.Windows, &b); err != nil {
+ return errors.WithContext(err, "error encoding window")
+ }
+ if err := coder.EncodeBool(tm.Clear, &b); err != nil {
+ return errors.WithContext(err, "error encoding clear bit")
+ }
+
+ if !tm.Clear {
+ if err := coder.EncodeEventTime(tm.FireTimestamp, &b); err != nil {
+ return errors.WithContext(err, "error encoding fire timestamp")
+ }
+ if err := coder.EncodeEventTime(tm.HoldTimestamp, &b); err != nil {
+ return errors.WithContext(err, "error encoding hold timestamp")
+ }
+ if err := coder.EncodePane(tm.Pane, &b); err != nil {
+ return errors.WithContext(err, "error encoding paneinfo")
+ }
+ }
+
+ w.Write(b.Bytes())
+ return nil
+}
+
+// decodeTimer decodes timer byte encoded with standard timer coder spec.
+func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (typex.TimerMap, error) {
+ tm := typex.TimerMap{}
+
+ fv, err := dec.Decode(r)
+ if err != nil {
+ return tm, errors.WithContext(err, "error decoding timer key")
+ }
+ tm.Key = fv.Elm.(string)
+
+ s, err := coder.DecodeStringUTF8(r)
+ if err != nil && err != io.EOF {
+ return tm, errors.WithContext(err, "error decoding timer tag")
+ } else if err == io.EOF {
+ // when tag is empty
+ tm.Tag = ""
+ }
+ tm.Tag = s
+
+ w, err := win.Decode(r)
+ if err != nil {
+ return tm, errors.WithContext(err, "error decoding timer window")
+ }
+ tm.Windows = w
+
+ c, err := coder.DecodeBool(r)
+ if err != nil {
+ return tm, errors.WithContext(err, "error decoding clear")
+ }
+ tm.Clear = c
+ if tm.Clear {
+ return tm, nil
+ }
+
+ ft, err := coder.DecodeEventTime(r)
+ if err != nil && err != io.EOF {
+ return tm, errors.WithContext(err, "error decoding ft")
+ }
+ tm.FireTimestamp = ft
+
+ ht, err := coder.DecodeEventTime(r)
+ if err != nil && err != io.EOF {
+ return tm, errors.WithContext(err, "error decoding ht")
+ }
+ tm.HoldTimestamp = ht
+
+ pn, err := coder.DecodePane(r)
+ if err != nil && err != io.EOF {
+ return tm, errors.WithContext(err, "error decoding pn")
+ }
+ tm.Pane = pn
+
+ return tm, nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
index 118b8668111..f69aadbb297 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
@@ -188,26 +188,6 @@ func TestIterableCoder(t *testing.T) {
}
}
-// TODO(https://github.com/apache/beam/issues/20510): Update once proper timer support is added
-func TestTimerCoder(t *testing.T) {
- var buf bytes.Buffer
- tCoder := coder.NewT(coder.NewVarInt(), coder.NewGlobalWindow())
- wantVal := &FullValue{Elm: int64(13)}
-
- enc := MakeElementEncoder(tCoder)
- if err := enc.Encode(wantVal, &buf); err != nil {
- t.Fatalf("Couldn't encode value: %v", err)
- }
-
- dec := MakeElementDecoder(tCoder)
- result, err := dec.Decode(&buf)
- if err != nil {
- t.Fatalf("Couldn't decode value: %v", err)
- }
-
- compareFV(t, result, wantVal)
-}
-
type namedTypeForTest struct {
A, B int64
C string
diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go
new file mode 100644
index 00000000000..d25f70b94e0
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func equalTimers(a, b typex.TimerMap) bool {
+ return a.Key == b.Key && a.Tag == b.Tag && (a.FireTimestamp) == b.FireTimestamp && a.Clear == b.Clear
+}
+
+func TestTimerEncodingDecoding(t *testing.T) {
+ tc := coder.NewT(coder.NewString(), window.NewGlobalWindows().Coder())
+ ec := MakeElementEncoder(coder.SkipW(tc))
+ dec := MakeElementDecoder(coder.SkipW(tc))
+
+ tests := []struct {
+ name string
+ tm typex.TimerMap
+ result bool
+ }{
+ {
+ name: "all set fields",
+ tm: typex.TimerMap{
+ Key: "Basic",
+ Tag: "first",
+ Windows: window.SingleGlobalWindow,
+ Clear: false,
+ FireTimestamp: mtime.Now(),
+ },
+ result: true,
+ },
+ {
+ name: "without tag",
+ tm: typex.TimerMap{
+ Key: "Basic",
+ Tag: "",
+ Windows: window.SingleGlobalWindow,
+ Clear: false,
+ FireTimestamp: mtime.Now(),
+ },
+ result: true,
+ },
+ {
+ name: "with clear set",
+ tm: typex.TimerMap{
+ Key: "Basic",
+ Tag: "first",
+ Windows: window.SingleGlobalWindow,
+ Clear: true,
+ FireTimestamp: mtime.Now(),
+ },
+ result: false,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ fv := FullValue{Elm: test.tm}
+ var buf bytes.Buffer
+ err := ec.Encode(&fv, &buf)
+ if err != nil {
+ t.Fatalf("error encoding timer: %#v, got: %v", test.tm, err)
+ }
+
+ gotFv, err := dec.Decode(&buf)
+ if err != nil {
+ t.Fatalf("failed to decode timer, got %v", err)
+ }
+
+ if got, want := gotFv.Elm.(typex.TimerMap), test.tm; test.result != equalTimers(got, want) {
+ t.Errorf("got timer %v, want %v", got, want)
+ }
+ })
+ }
+
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 60a22a26038..1e57a42587a 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -537,7 +537,19 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
}
return b.internRowCoder(s), nil
- // TODO(https://github.com/apache/beam/issues/20510): Handle coder.Timer support.
+ case coder.Timer:
+ comp := []string{}
+ if ids, err := b.AddMulti(c.Components); err != nil {
+ return "", errors.SetTopLevelMsgf(err, "failed to marshal timer coder %v", c)
+ } else {
+ comp = append(comp, ids...)
+ }
+ if id, err := b.AddWindowCoder(c.Window); err != nil {
+ return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+ } else {
+ comp = append(comp, id)
+ }
+ return b.internBuiltInCoder(urnTimerCoder, comp...), nil
default:
err := errors.Errorf("unexpected coder kind: %v", c.Kind)
diff --git a/sdks/go/pkg/beam/core/typex/class.go b/sdks/go/pkg/beam/core/typex/class.go
index 028e8a2db91..e112495ee98 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -234,7 +234,7 @@ func IsUniversal(t reflect.Type) bool {
// Composite marker types: KV, CoGBK or WindowedValue.
func IsComposite(t reflect.Type) bool {
switch t {
- case KVType, CoGBKType, WindowedValueType:
+ case KVType, CoGBKType, WindowedValueType, TimersType:
return true
default:
return false
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go
index df5425a4e1a..386acad3818 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -139,6 +139,8 @@ func New(t reflect.Type, components ...FullType) FullType {
panic("Invalid to nest composites inside CoGBK")
}
return &tree{class, t, components}
+ case TimersType:
+ return &tree{class, t, components}
default:
panic(fmt.Sprintf("Unexpected composite type: %v", t))
}
diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go
index 334436872a2..c6512057b54 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -36,6 +36,7 @@ var (
EventTimeType = reflect.TypeOf((*EventTime)(nil)).Elem()
WindowType = reflect.TypeOf((*Window)(nil)).Elem()
+ TimersType = reflect.TypeOf((*Timers)(nil)).Elem()
PaneInfoType = reflect.TypeOf((*PaneInfo)(nil)).Elem()
KVType = reflect.TypeOf((*KV)(nil)).Elem()
@@ -88,6 +89,25 @@ type PaneInfo struct {
Index, NonSpeculativeIndex int64
}
+// Timers is the actual type used for standard timer coder.
+type Timers struct {
+ Key []byte // elm type.
+ Tag string
+ Windows []byte // []typex.Window
+ Clear bool
+ FireTimestamp, HoldTimestamp mtime.Time
+ Pane PaneInfo
+}
+
+// TimerMap is a placeholder for timer details used in encoding/decoding.
+type TimerMap struct {
+ Key, Tag string
+ Windows []Window // []typex.Window
+ Clear bool
+ FireTimestamp, HoldTimestamp mtime.Time
+ Pane PaneInfo
+}
+
// KV, Nullable, CoGBK, WindowedValue represent composite generic types. They are not used
// directly in user code signatures, but only in FullTypes.
diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
index 69d59d201c4..2a8476fe65a 100644
--- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go
+++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
@@ -34,6 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/google/go-cmp/cmp"
@@ -43,7 +44,6 @@ import (
var unimplementedCoders = map[string]bool{
"beam:coder:param_windowed_value:v1": true,
- "beam:coder:timer:v1": true,
"beam:coder:sharded_key:v1": true,
"beam:coder:custom_window:v1": true,
}
@@ -294,7 +294,9 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
if !diff(c.Components[1], elem, vs[3]) {
pass = false
}
- // TODO compare pane information.
+ if !diffPane(vs[2].Value, elem.Pane) {
+ pass = false
+ }
return pass
case "beam:coder:row:v1":
fs := eg.Value.(yaml.MapSlice)
@@ -320,6 +322,48 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
}
got, want = elem.Elm, rv.Interface()
+ case "beam:coder:timer:v1":
+ pass := true
+ tm := elem.Elm.(typex.TimerMap)
+ fs := eg.Value.(yaml.MapSlice)
+ for _, item := range fs {
+
+ switch item.Key.(string) {
+ case "userKey":
+ if want := item.Value.(string); want != tm.Key {
+ pass = false
+ }
+ case "dynamicTimerTag":
+ if want := item.Value.(string); want != tm.Tag {
+ pass = false
+ }
+ case "windows":
+ if v, ok := item.Value.([]interface{}); ok {
+ for i, val := range v {
+ if val.(string) == "global" && fmt.Sprintf("%s", tm.Windows[i]) == "[*]" {
+ continue
+ } else if val.(string) != fmt.Sprintf("%s", tm.Windows[i]) {
+ pass = false
+ }
+ }
+ }
+ case "clearBit":
+ if want := item.Value.(bool); want != tm.Clear {
+ pass = false
+ }
+ case "fireTimestamp":
+ if want := item.Value.(int); want != int(tm.FireTimestamp) {
+ pass = false
+ }
+ case "holdTimestamp":
+ if want := item.Value.(int); want != int(tm.HoldTimestamp) {
+ pass = false
+ }
+ case "pane":
+ pass = diffPane(item.Value, tm.Pane)
+ }
+ }
+ return pass
default:
got, want = elem.Elm, eg.Value
}
@@ -330,6 +374,41 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
return true
}
+func diffPane(eg interface{}, got typex.PaneInfo) bool {
+ pass := true
+ paneTiming := map[typex.PaneTiming]string{
+ typex.PaneUnknown: "UNKNOWN",
+ typex.PaneEarly: "EARLY",
+ typex.PaneLate: "LATE",
+ typex.PaneOnTime: "ONTIME",
+ }
+ for _, item := range eg.(yaml.MapSlice) {
+ switch item.Key.(string) {
+ case "is_first":
+ if want := item.Value.(bool); want != got.IsFirst {
+ pass = false
+ }
+ case "is_last":
+ if want := item.Value.(bool); want != got.IsLast {
+ pass = false
+ }
+ case "timing":
+ if want := item.Value.(string); want != paneTiming[got.Timing] {
+ pass = false
+ }
+ case "index":
+ if want := item.Value.(int); want != int(got.Index) {
+ pass = false
+ }
+ case "on_time_index":
+ if want := item.Value.(int); want != int(got.NonSpeculativeIndex) {
+ pass = false
+ }
+ }
+ }
+ return pass
+}
+
// standard_coders.yaml uses the name for type indication, except for nullability.
var nameToType = map[string]reflect.Type{
"str": reflectx.String,