You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/09/19 16:50:14 UTC

[beam] branch master updated: [Go SDK] Add timer coder support (#23222)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 118454dded3 [Go SDK] Add timer coder support (#23222)
118454dded3 is described below

commit 118454dded38e8859384a21b6153f38c8cc06ec3
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Mon Sep 19 12:50:06 2022 -0400

    [Go SDK] Add timer coder support (#23222)
---
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  12 +--
 sdks/go/pkg/beam/core/runtime/exec/coder.go        | 101 ++++++++++++++++++++-
 sdks/go/pkg/beam/core/runtime/exec/coder_test.go   |  20 ----
 sdks/go/pkg/beam/core/runtime/exec/timers_test.go  |  96 ++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  14 ++-
 sdks/go/pkg/beam/core/typex/class.go               |   2 +-
 sdks/go/pkg/beam/core/typex/fulltype.go            |   2 +
 sdks/go/pkg/beam/core/typex/special.go             |  20 ++++
 .../go/test/regression/coders/fromyaml/fromyaml.go |  83 ++++++++++++++++-
 9 files changed, 314 insertions(+), 36 deletions(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index a368be7043f..3ee83502f34 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -347,17 +347,9 @@ func NewT(c *Coder, w *WindowCoder) *Coder {
 		panic("window must not be nil")
 	}
 
-	// TODO(https://github.com/apache/beam/issues/20510): Implement proper timer support.
 	return &Coder{
-		Kind: Timer,
-		T: typex.New(reflect.TypeOf((*struct {
-			Key                          []byte // elm type.
-			Tag                          string
-			Windows                      []byte // []typex.Window
-			Clear                        bool
-			FireTimestamp, HoldTimestamp int64
-			Span                         int
-		})(nil)).Elem()),
+		Kind:       Timer,
+		T:          typex.New(typex.TimersType),
 		Window:     w,
 		Components: []*Coder{c},
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 75cb1cca231..0421b253bbe 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -145,6 +145,7 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
 	case coder.Timer:
 		return &timerEncoder{
 			elm: MakeElementEncoder(c.Components[0]),
+			win: MakeWindowEncoder(c.Window),
 		}
 
 	case coder.Row:
@@ -262,6 +263,7 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
 	case coder.Timer:
 		return &timerDecoder{
 			elm: MakeElementDecoder(c.Components[0]),
+			win: MakeWindowDecoder(c.Window),
 		}
 
 	case coder.Row:
@@ -890,18 +892,25 @@ func (d *paramWindowedValueDecoder) Decode(r io.Reader) (*FullValue, error) {
 
 type timerEncoder struct {
 	elm ElementEncoder
+	win WindowEncoder
 }
 
 func (e *timerEncoder) Encode(val *FullValue, w io.Writer) error {
-	return e.elm.Encode(val, w)
+	return encodeTimer(e.elm, e.win, val.Elm.(typex.TimerMap), w)
 }
 
 type timerDecoder struct {
 	elm ElementDecoder
+	win WindowDecoder
 }
 
 func (d *timerDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
-	return d.elm.DecodeTo(r, fv)
+	data, err := decodeTimer(d.elm, d.win, r)
+	if err != nil {
+		return err
+	}
+	fv.Elm = data
+	return nil
 }
 
 func (d *timerDecoder) Decode(r io.Reader) (*FullValue, error) {
@@ -1208,3 +1217,91 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window,
 
 	return ws, t, pn, nil
 }
+
+// encodeTimer encodes a typex.TimerMap into a byte stream.
+func encodeTimer(elm ElementEncoder, win WindowEncoder, tm typex.TimerMap, w io.Writer) error {
+	var b bytes.Buffer
+
+	elm.Encode(&FullValue{Elm: tm.Key}, &b)
+
+	if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil {
+		return errors.WithContext(err, "error encoding tag")
+	}
+
+	if err := win.Encode(tm.Windows, &b); err != nil {
+		return errors.WithContext(err, "error encoding window")
+	}
+	if err := coder.EncodeBool(tm.Clear, &b); err != nil {
+		return errors.WithContext(err, "error encoding clear bit")
+	}
+
+	if !tm.Clear {
+		if err := coder.EncodeEventTime(tm.FireTimestamp, &b); err != nil {
+			return errors.WithContext(err, "error encoding fire timestamp")
+		}
+		if err := coder.EncodeEventTime(tm.HoldTimestamp, &b); err != nil {
+			return errors.WithContext(err, "error encoding hold timestamp")
+		}
+		if err := coder.EncodePane(tm.Pane, &b); err != nil {
+			return errors.WithContext(err, "error encoding paneinfo")
+		}
+	}
+
+	w.Write(b.Bytes())
+	return nil
+}
+
+// decodeTimer decodes timer byte encoded with standard timer coder spec.
+func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (typex.TimerMap, error) {
+	tm := typex.TimerMap{}
+
+	fv, err := dec.Decode(r)
+	if err != nil {
+		return tm, errors.WithContext(err, "error decoding timer key")
+	}
+	tm.Key = fv.Elm.(string)
+
+	s, err := coder.DecodeStringUTF8(r)
+	if err != nil && err != io.EOF {
+		return tm, errors.WithContext(err, "error decoding timer tag")
+	} else if err == io.EOF {
+		// when tag is empty
+		tm.Tag = ""
+	}
+	tm.Tag = s
+
+	w, err := win.Decode(r)
+	if err != nil {
+		return tm, errors.WithContext(err, "error decoding timer window")
+	}
+	tm.Windows = w
+
+	c, err := coder.DecodeBool(r)
+	if err != nil {
+		return tm, errors.WithContext(err, "error decoding clear")
+	}
+	tm.Clear = c
+	if tm.Clear {
+		return tm, nil
+	}
+
+	ft, err := coder.DecodeEventTime(r)
+	if err != nil && err != io.EOF {
+		return tm, errors.WithContext(err, "error decoding ft")
+	}
+	tm.FireTimestamp = ft
+
+	ht, err := coder.DecodeEventTime(r)
+	if err != nil && err != io.EOF {
+		return tm, errors.WithContext(err, "error decoding ht")
+	}
+	tm.HoldTimestamp = ht
+
+	pn, err := coder.DecodePane(r)
+	if err != nil && err != io.EOF {
+		return tm, errors.WithContext(err, "error decoding pn")
+	}
+	tm.Pane = pn
+
+	return tm, nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
index 118b8668111..f69aadbb297 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
@@ -188,26 +188,6 @@ func TestIterableCoder(t *testing.T) {
 	}
 }
 
-// TODO(https://github.com/apache/beam/issues/20510): Update once proper timer support is added
-func TestTimerCoder(t *testing.T) {
-	var buf bytes.Buffer
-	tCoder := coder.NewT(coder.NewVarInt(), coder.NewGlobalWindow())
-	wantVal := &FullValue{Elm: int64(13)}
-
-	enc := MakeElementEncoder(tCoder)
-	if err := enc.Encode(wantVal, &buf); err != nil {
-		t.Fatalf("Couldn't encode value: %v", err)
-	}
-
-	dec := MakeElementDecoder(tCoder)
-	result, err := dec.Decode(&buf)
-	if err != nil {
-		t.Fatalf("Couldn't decode value: %v", err)
-	}
-
-	compareFV(t, result, wantVal)
-}
-
 type namedTypeForTest struct {
 	A, B int64
 	C    string
diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go
new file mode 100644
index 00000000000..d25f70b94e0
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"bytes"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func equalTimers(a, b typex.TimerMap) bool {
+	return a.Key == b.Key && a.Tag == b.Tag && (a.FireTimestamp) == b.FireTimestamp && a.Clear == b.Clear
+}
+
+func TestTimerEncodingDecoding(t *testing.T) {
+	tc := coder.NewT(coder.NewString(), window.NewGlobalWindows().Coder())
+	ec := MakeElementEncoder(coder.SkipW(tc))
+	dec := MakeElementDecoder(coder.SkipW(tc))
+
+	tests := []struct {
+		name   string
+		tm     typex.TimerMap
+		result bool
+	}{
+		{
+			name: "all set fields",
+			tm: typex.TimerMap{
+				Key:           "Basic",
+				Tag:           "first",
+				Windows:       window.SingleGlobalWindow,
+				Clear:         false,
+				FireTimestamp: mtime.Now(),
+			},
+			result: true,
+		},
+		{
+			name: "without tag",
+			tm: typex.TimerMap{
+				Key:           "Basic",
+				Tag:           "",
+				Windows:       window.SingleGlobalWindow,
+				Clear:         false,
+				FireTimestamp: mtime.Now(),
+			},
+			result: true,
+		},
+		{
+			name: "with clear set",
+			tm: typex.TimerMap{
+				Key:           "Basic",
+				Tag:           "first",
+				Windows:       window.SingleGlobalWindow,
+				Clear:         true,
+				FireTimestamp: mtime.Now(),
+			},
+			result: false,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			fv := FullValue{Elm: test.tm}
+			var buf bytes.Buffer
+			err := ec.Encode(&fv, &buf)
+			if err != nil {
+				t.Fatalf("error encoding timer: %#v, got: %v", test.tm, err)
+			}
+
+			gotFv, err := dec.Decode(&buf)
+			if err != nil {
+				t.Fatalf("failed to decode timer, got %v", err)
+			}
+
+			if got, want := gotFv.Elm.(typex.TimerMap), test.tm; test.result != equalTimers(got, want) {
+				t.Errorf("got timer %v, want %v", got, want)
+			}
+		})
+	}
+
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 60a22a26038..1e57a42587a 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -537,7 +537,19 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {
 		}
 		return b.internRowCoder(s), nil
 
-	// TODO(https://github.com/apache/beam/issues/20510): Handle coder.Timer support.
+	case coder.Timer:
+		comp := []string{}
+		if ids, err := b.AddMulti(c.Components); err != nil {
+			return "", errors.SetTopLevelMsgf(err, "failed to marshal timer coder %v", c)
+		} else {
+			comp = append(comp, ids...)
+		}
+		if id, err := b.AddWindowCoder(c.Window); err != nil {
+			return "", errors.Wrapf(err, "failed to marshal window coder %v", c)
+		} else {
+			comp = append(comp, id)
+		}
+		return b.internBuiltInCoder(urnTimerCoder, comp...), nil
 
 	default:
 		err := errors.Errorf("unexpected coder kind: %v", c.Kind)
diff --git a/sdks/go/pkg/beam/core/typex/class.go b/sdks/go/pkg/beam/core/typex/class.go
index 028e8a2db91..e112495ee98 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -234,7 +234,7 @@ func IsUniversal(t reflect.Type) bool {
 // Composite marker types: KV, CoGBK or WindowedValue.
 func IsComposite(t reflect.Type) bool {
 	switch t {
-	case KVType, CoGBKType, WindowedValueType:
+	case KVType, CoGBKType, WindowedValueType, TimersType:
 		return true
 	default:
 		return false
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go
index df5425a4e1a..386acad3818 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -139,6 +139,8 @@ func New(t reflect.Type, components ...FullType) FullType {
 				panic("Invalid to nest composites inside CoGBK")
 			}
 			return &tree{class, t, components}
+		case TimersType:
+			return &tree{class, t, components}
 		default:
 			panic(fmt.Sprintf("Unexpected composite type: %v", t))
 		}
diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go
index 334436872a2..c6512057b54 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -36,6 +36,7 @@ var (
 
 	EventTimeType = reflect.TypeOf((*EventTime)(nil)).Elem()
 	WindowType    = reflect.TypeOf((*Window)(nil)).Elem()
+	TimersType    = reflect.TypeOf((*Timers)(nil)).Elem()
 	PaneInfoType  = reflect.TypeOf((*PaneInfo)(nil)).Elem()
 
 	KVType                 = reflect.TypeOf((*KV)(nil)).Elem()
@@ -88,6 +89,25 @@ type PaneInfo struct {
 	Index, NonSpeculativeIndex int64
 }
 
+// Timers is the actual type used for standard timer coder.
+type Timers struct {
+	Key                          []byte // elm type.
+	Tag                          string
+	Windows                      []byte // []typex.Window
+	Clear                        bool
+	FireTimestamp, HoldTimestamp mtime.Time
+	Pane                         PaneInfo
+}
+
+// TimerMap is a placeholder for timer details used in encoding/decoding.
+type TimerMap struct {
+	Key, Tag                     string
+	Windows                      []Window // []typex.Window
+	Clear                        bool
+	FireTimestamp, HoldTimestamp mtime.Time
+	Pane                         PaneInfo
+}
+
 // KV, Nullable, CoGBK, WindowedValue represent composite generic types. They are not used
 // directly in user code signatures, but only in FullTypes.
 
diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
index 69d59d201c4..2a8476fe65a 100644
--- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go
+++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
@@ -34,6 +34,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/google/go-cmp/cmp"
@@ -43,7 +44,6 @@ import (
 
 var unimplementedCoders = map[string]bool{
 	"beam:coder:param_windowed_value:v1": true,
-	"beam:coder:timer:v1":                true,
 	"beam:coder:sharded_key:v1":          true,
 	"beam:coder:custom_window:v1":        true,
 }
@@ -294,7 +294,9 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
 		if !diff(c.Components[1], elem, vs[3]) {
 			pass = false
 		}
-		// TODO compare pane information.
+		if !diffPane(vs[2].Value, elem.Pane) {
+			pass = false
+		}
 		return pass
 	case "beam:coder:row:v1":
 		fs := eg.Value.(yaml.MapSlice)
@@ -320,6 +322,48 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
 		}
 
 		got, want = elem.Elm, rv.Interface()
+	case "beam:coder:timer:v1":
+		pass := true
+		tm := elem.Elm.(typex.TimerMap)
+		fs := eg.Value.(yaml.MapSlice)
+		for _, item := range fs {
+
+			switch item.Key.(string) {
+			case "userKey":
+				if want := item.Value.(string); want != tm.Key {
+					pass = false
+				}
+			case "dynamicTimerTag":
+				if want := item.Value.(string); want != tm.Tag {
+					pass = false
+				}
+			case "windows":
+				if v, ok := item.Value.([]interface{}); ok {
+					for i, val := range v {
+						if val.(string) == "global" && fmt.Sprintf("%s", tm.Windows[i]) == "[*]" {
+							continue
+						} else if val.(string) != fmt.Sprintf("%s", tm.Windows[i]) {
+							pass = false
+						}
+					}
+				}
+			case "clearBit":
+				if want := item.Value.(bool); want != tm.Clear {
+					pass = false
+				}
+			case "fireTimestamp":
+				if want := item.Value.(int); want != int(tm.FireTimestamp) {
+					pass = false
+				}
+			case "holdTimestamp":
+				if want := item.Value.(int); want != int(tm.HoldTimestamp) {
+					pass = false
+				}
+			case "pane":
+				pass = diffPane(item.Value, tm.Pane)
+			}
+		}
+		return pass
 	default:
 		got, want = elem.Elm, eg.Value
 	}
@@ -330,6 +374,41 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem) bool {
 	return true
 }
 
+func diffPane(eg interface{}, got typex.PaneInfo) bool {
+	pass := true
+	paneTiming := map[typex.PaneTiming]string{
+		typex.PaneUnknown: "UNKNOWN",
+		typex.PaneEarly:   "EARLY",
+		typex.PaneLate:    "LATE",
+		typex.PaneOnTime:  "ONTIME",
+	}
+	for _, item := range eg.(yaml.MapSlice) {
+		switch item.Key.(string) {
+		case "is_first":
+			if want := item.Value.(bool); want != got.IsFirst {
+				pass = false
+			}
+		case "is_last":
+			if want := item.Value.(bool); want != got.IsLast {
+				pass = false
+			}
+		case "timing":
+			if want := item.Value.(string); want != paneTiming[got.Timing] {
+				pass = false
+			}
+		case "index":
+			if want := item.Value.(int); want != int(got.Index) {
+				pass = false
+			}
+		case "on_time_index":
+			if want := item.Value.(int); want != int(got.NonSpeculativeIndex) {
+				pass = false
+			}
+		}
+	}
+	return pass
+}
+
 // standard_coders.yaml uses the name for type indication, except for nullability.
 var nameToType = map[string]reflect.Type{
 	"str":     reflectx.String,