You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/22 00:06:14 UTC
[beam] branch master updated: [BEAM-14306] Add unit testing to pane coder (#17370)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 657caa88c01 [BEAM-14306] Add unit testing to pane coder (#17370)
657caa88c01 is described below
commit 657caa88c011cae818377f64f02afe9d18614da5
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Apr 21 20:06:07 2022 -0400
[BEAM-14306] Add unit testing to pane coder (#17370)
---
sdks/go/pkg/beam/core/graph/coder/panes.go | 18 ++-
sdks/go/pkg/beam/core/graph/coder/panes_test.go | 179 ++++++++++++++++++++++++
2 files changed, 190 insertions(+), 7 deletions(-)
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes.go b/sdks/go/pkg/beam/core/graph/coder/panes.go
index 3ccd987e765..bb193c0fe5b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/panes.go
+++ b/sdks/go/pkg/beam/core/graph/coder/panes.go
@@ -16,6 +16,7 @@
package coder
import (
+ "fmt"
"io"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
@@ -28,20 +29,23 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
pane := byte(0)
if v.IsFirst {
- pane |= 0x01
+ pane |= 0x02
}
if v.IsLast {
- pane |= 0x02
+ pane |= 0x01
}
pane |= byte(v.Timing << 2)
switch {
- case v.Index == 0 || v.NonSpeculativeIndex == 0 || v.Timing == typex.PaneUnknown:
+ case (v.Index == 0 && v.NonSpeculativeIndex == 0) || v.Timing == typex.PaneUnknown:
// The entire pane info is encoded as a single byte
paneByte := []byte{pane}
w.Write(paneByte)
case v.Index == v.NonSpeculativeIndex || v.Timing == typex.PaneEarly:
// The pane info is encoded as this byte plus a single VarInt encoded integer
+ if v.Timing == typex.PaneEarly && v.NonSpeculativeIndex != -1 {
+ return fmt.Errorf("error encoding pane %v: non-speculative index value must be equal to -1 if the pane timing is early", v)
+ }
paneByte := []byte{pane | 1<<4}
w.Write(paneByte)
EncodeVarInt(v.Index, w)
@@ -60,11 +64,11 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
func NewPane(b byte) typex.PaneInfo {
pn := typex.NoFiringPane()
- if b&0x01 == 1 {
- pn.IsFirst = true
+ if !(b&0x02 == 2) {
+ pn.IsFirst = false
}
- if b&0x02 == 2 {
- pn.IsLast = true
+ if !(b&0x01 == 1) {
+ pn.IsLast = false
}
pn.Timing = typex.PaneTiming((b >> 2) & 0x03)
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes_test.go b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
new file mode 100644
index 00000000000..49088e7f60c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "bytes"
+ "math"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makePaneInfo(timing typex.PaneTiming, first, last bool, index, nsIndex int64) typex.PaneInfo {
+ return typex.PaneInfo{Timing: timing, IsFirst: first, IsLast: last, Index: index, NonSpeculativeIndex: nsIndex}
+}
+
+func equalPanes(left, right typex.PaneInfo) bool {
+ return (left.Timing == right.Timing) && (left.IsFirst == right.IsFirst) && (left.IsLast == right.IsLast) && (left.Index == right.Index) && (left.NonSpeculativeIndex == right.NonSpeculativeIndex)
+}
+
+func TestPaneCoder(t *testing.T) {
+ tests := []struct {
+ name string
+ timing typex.PaneTiming
+ first bool
+ last bool
+ index int64
+ nsIndex int64
+ }{
+ {
+ "false bools",
+ typex.PaneUnknown,
+ false,
+ false,
+ 0,
+ 0,
+ },
+ {
+ "true bools",
+ typex.PaneUnknown,
+ true,
+ true,
+ 0,
+ 0,
+ },
+ {
+ "first pane",
+ typex.PaneUnknown,
+ true,
+ false,
+ 0,
+ 0,
+ },
+ {
+ "last pane",
+ typex.PaneUnknown,
+ false,
+ true,
+ 0,
+ 0,
+ },
+ {
+ "on time, different index and non-speculative",
+ typex.PaneOnTime,
+ false,
+ false,
+ 1,
+ 2,
+ },
+ {
+ "valid early pane",
+ typex.PaneEarly,
+ true,
+ false,
+ math.MaxInt64,
+ -1,
+ },
+ {
+ "on time, max non-speculative index",
+ typex.PaneOnTime,
+ false,
+ true,
+ 0,
+ math.MaxInt64,
+ },
+ {
+ "late pane, max index",
+ typex.PaneLate,
+ false,
+ false,
+ math.MaxInt64,
+ 0,
+ },
+ {
+ "on time, min non-speculative index",
+ typex.PaneOnTime,
+ false,
+ true,
+ 0,
+ math.MinInt64,
+ },
+ {
+ "late, min index",
+ typex.PaneLate,
+ false,
+ false,
+ math.MinInt64,
+ 0,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ input := makePaneInfo(test.timing, test.first, test.last, test.index, test.nsIndex)
+ var buf bytes.Buffer
+ err := EncodePane(input, &buf)
+ if err != nil {
+ t.Fatalf("failed to encode pane %v, got %v", input, err)
+ }
+ got, err := DecodePane(&buf)
+ if err != nil {
+ t.Fatalf("failed to decode pane from buffer %v, got %v", buf, err)
+ }
+ if want := input; !equalPanes(got, want) {
+ t.Errorf("got pane %v, want %v", got, want)
+ }
+ })
+ }
+}
+
+func TestEncodePane_bad(t *testing.T) {
+ tests := []struct {
+ name string
+ timing typex.PaneTiming
+ first bool
+ last bool
+ index int64
+ nsIndex int64
+ }{
+ {
+ "invalid early pane, max ints",
+ typex.PaneEarly,
+ true,
+ false,
+ math.MaxInt64,
+ math.MaxInt64,
+ },
+ {
+ "invalid early pane, min ints",
+ typex.PaneEarly,
+ true,
+ false,
+ math.MinInt64,
+ math.MinInt64,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ input := makePaneInfo(test.timing, test.first, test.last, test.index, test.nsIndex)
+ var buf bytes.Buffer
+ err := EncodePane(input, &buf)
+ if err == nil {
+ t.Errorf("successfully encoded pane when it should have failed, got %v", buf)
+ }
+ })
+ }
+}