You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/22 00:06:14 UTC

[beam] branch master updated: [BEAM-14306] Add unit testing to pane coder (#17370)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 657caa88c01 [BEAM-14306] Add unit testing to pane coder (#17370)
657caa88c01 is described below

commit 657caa88c011cae818377f64f02afe9d18614da5
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Apr 21 20:06:07 2022 -0400

    [BEAM-14306] Add unit testing to pane coder (#17370)
---
 sdks/go/pkg/beam/core/graph/coder/panes.go      |  18 ++-
 sdks/go/pkg/beam/core/graph/coder/panes_test.go | 179 ++++++++++++++++++++++++
 2 files changed, 190 insertions(+), 7 deletions(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/panes.go b/sdks/go/pkg/beam/core/graph/coder/panes.go
index 3ccd987e765..bb193c0fe5b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/panes.go
+++ b/sdks/go/pkg/beam/core/graph/coder/panes.go
@@ -16,6 +16,7 @@
 package coder
 
 import (
+	"fmt"
 	"io"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
@@ -28,20 +29,23 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
 
 	pane := byte(0)
 	if v.IsFirst {
-		pane |= 0x01
+		pane |= 0x02
 	}
 	if v.IsLast {
-		pane |= 0x02
+		pane |= 0x01
 	}
 	pane |= byte(v.Timing << 2)
 
 	switch {
-	case v.Index == 0 || v.NonSpeculativeIndex == 0 || v.Timing == typex.PaneUnknown:
+	case (v.Index == 0 && v.NonSpeculativeIndex == 0) || v.Timing == typex.PaneUnknown:
 		// The entire pane info is encoded as a single byte
 		paneByte := []byte{pane}
 		w.Write(paneByte)
 	case v.Index == v.NonSpeculativeIndex || v.Timing == typex.PaneEarly:
 		// The pane info is encoded as this byte plus a single VarInt encoded integer
+		if v.Timing == typex.PaneEarly && v.NonSpeculativeIndex != -1 {
+			return fmt.Errorf("error encoding pane %v: non-speculative index value must be equal to -1 if the pane timing is early", v)
+		}
 		paneByte := []byte{pane | 1<<4}
 		w.Write(paneByte)
 		EncodeVarInt(v.Index, w)
@@ -60,11 +64,11 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
 func NewPane(b byte) typex.PaneInfo {
 	pn := typex.NoFiringPane()
 
-	if b&0x01 == 1 {
-		pn.IsFirst = true
+	if !(b&0x02 == 2) {
+		pn.IsFirst = false
 	}
-	if b&0x02 == 2 {
-		pn.IsLast = true
+	if !(b&0x01 == 1) {
+		pn.IsLast = false
 	}
 
 	pn.Timing = typex.PaneTiming((b >> 2) & 0x03)
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes_test.go b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
new file mode 100644
index 00000000000..49088e7f60c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"bytes"
+	"math"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makePaneInfo(timing typex.PaneTiming, first, last bool, index, nsIndex int64) typex.PaneInfo {
+	return typex.PaneInfo{Timing: timing, IsFirst: first, IsLast: last, Index: index, NonSpeculativeIndex: nsIndex}
+}
+
+func equalPanes(left, right typex.PaneInfo) bool {
+	return (left.Timing == right.Timing) && (left.IsFirst == right.IsFirst) && (left.IsLast == right.IsLast) && (left.Index == right.Index) && (left.NonSpeculativeIndex == right.NonSpeculativeIndex)
+}
+
+func TestPaneCoder(t *testing.T) {
+	tests := []struct {
+		name    string
+		timing  typex.PaneTiming
+		first   bool
+		last    bool
+		index   int64
+		nsIndex int64
+	}{
+		{
+			"false bools",
+			typex.PaneUnknown,
+			false,
+			false,
+			0,
+			0,
+		},
+		{
+			"true bools",
+			typex.PaneUnknown,
+			true,
+			true,
+			0,
+			0,
+		},
+		{
+			"first pane",
+			typex.PaneUnknown,
+			true,
+			false,
+			0,
+			0,
+		},
+		{
+			"last pane",
+			typex.PaneUnknown,
+			false,
+			true,
+			0,
+			0,
+		},
+		{
+			"on time, different index and non-speculative",
+			typex.PaneOnTime,
+			false,
+			false,
+			1,
+			2,
+		},
+		{
+			"valid early pane",
+			typex.PaneEarly,
+			true,
+			false,
+			math.MaxInt64,
+			-1,
+		},
+		{
+			"on time, max non-speculative index",
+			typex.PaneOnTime,
+			false,
+			true,
+			0,
+			math.MaxInt64,
+		},
+		{
+			"late pane, max index",
+			typex.PaneLate,
+			false,
+			false,
+			math.MaxInt64,
+			0,
+		},
+		{
+			"on time, min non-speculative index",
+			typex.PaneOnTime,
+			false,
+			true,
+			0,
+			math.MinInt64,
+		},
+		{
+			"late, min index",
+			typex.PaneLate,
+			false,
+			false,
+			math.MinInt64,
+			0,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			input := makePaneInfo(test.timing, test.first, test.last, test.index, test.nsIndex)
+			var buf bytes.Buffer
+			err := EncodePane(input, &buf)
+			if err != nil {
+				t.Fatalf("failed to encode pane %v, got %v", input, err)
+			}
+			got, err := DecodePane(&buf)
+			if err != nil {
+				t.Fatalf("failed to decode pane from buffer %v, got %v", buf, err)
+			}
+			if want := input; !equalPanes(got, want) {
+				t.Errorf("got pane %v, want %v", got, want)
+			}
+		})
+	}
+}
+
+func TestEncodePane_bad(t *testing.T) {
+	tests := []struct {
+		name    string
+		timing  typex.PaneTiming
+		first   bool
+		last    bool
+		index   int64
+		nsIndex int64
+	}{
+		{
+			"invalid early pane, max ints",
+			typex.PaneEarly,
+			true,
+			false,
+			math.MaxInt64,
+			math.MaxInt64,
+		},
+		{
+			"invalid early pane, min ints",
+			typex.PaneEarly,
+			true,
+			false,
+			math.MinInt64,
+			math.MinInt64,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			input := makePaneInfo(test.timing, test.first, test.last, test.index, test.nsIndex)
+			var buf bytes.Buffer
+			err := EncodePane(input, &buf)
+			if err == nil {
+				t.Errorf("successfully encoded pane when it should have failed, got %v", buf)
+			}
+		})
+	}
+}