You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/22 22:29:28 UTC

[beam] branch master updated: [Go SDK] Fix side input window coding, & re-enable tests.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b8159f0  [Go SDK] Fix side input window coding, & re-enable tests.
     new fa3af06  Merge pull request #7591 from lostluck/singlewindow
b8159f0 is described below

commit b8159f0e803be6866bd46753df82d4282cdab3fb
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Tue Jan 22 22:04:43 2019 +0000

    [Go SDK] Fix side input window coding, & re-enable tests.
---
 sdks/go/pkg/beam/core/runtime/exec/coder.go | 33 +++++++++++++++++++----------
 sdks/go/test/integration/driver.go          |  4 ++--
 2 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b3a1491..012a77f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -250,13 +250,14 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) {
 type WindowEncoder interface {
 	// Encode serializes the given value to the writer.
 	Encode([]typex.Window, io.Writer) error
+	EncodeSingle(typex.Window, io.Writer) error
 }
 
 // EncodeWindow is a convenience function for encoding a single window into a
 // byte slice.
 func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error) {
 	var buf bytes.Buffer
-	if err := c.Encode([]typex.Window{w}, &buf); err != nil {
+	if err := c.EncodeSingle(w, &buf); err != nil {
 		return nil, err
 	}
 	return buf.Bytes(), nil
@@ -304,6 +305,10 @@ func (*globalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
 	return coder.EncodeInt32(1, w) // #windows
 }
 
+func (*globalWindowEncoder) EncodeSingle(ws typex.Window, w io.Writer) error {
+	return nil
+}
+
 type globalWindowDecoder struct{}
 
 func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
@@ -313,25 +318,31 @@ func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
 
 type intervalWindowEncoder struct{}
 
-func (*intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
-	// Encoding: upper bound and duration
-
+func (enc *intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
 	if err := coder.EncodeInt32(int32(len(ws)), w); err != nil { // #windows
 		return err
 	}
 	for _, elm := range ws {
-		iw := elm.(window.IntervalWindow)
-		if err := coder.EncodeEventTime(iw.End, w); err != nil {
-			return err
-		}
-		duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
-		if err := coder.EncodeVarUint64(uint64(duration), w); err != nil {
-			return err
+		if err := enc.EncodeSingle(elm, w); err != nil {
+			return nil
 		}
 	}
 	return nil
 }
 
+func (*intervalWindowEncoder) EncodeSingle(elm typex.Window, w io.Writer) error {
+	// Encoding: upper bound and duration
+	iw := elm.(window.IntervalWindow)
+	if err := coder.EncodeEventTime(iw.End, w); err != nil {
+		return err
+	}
+	duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
+	if err := coder.EncodeVarUint64(uint64(duration), w); err != nil {
+		return err
+	}
+	return nil
+}
+
 type intervalWindowDecoder struct{}
 
 func (*intervalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
diff --git a/sdks/go/test/integration/driver.go b/sdks/go/test/integration/driver.go
index 8daac8e..92a5529 100644
--- a/sdks/go/test/integration/driver.go
+++ b/sdks/go/test/integration/driver.go
@@ -59,8 +59,8 @@ func main() {
 		{"wordcount:memfs", wordcount.WordCount(old_pond, "+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
 		{"wordcount:kinglear", wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", "7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
 		{"pardo:multioutput", primitives.ParDoMultiOutput()},
-		// BEAM-3286: {"pardo:sideinput", primitives.ParDoSideInput()},
-		// BEAM-3286: {"pardo:kvsideinput", primitives.ParDoKVSideInput()},
+		{"pardo:sideinput", primitives.ParDoSideInput()},
+		{"pardo:kvsideinput", primitives.ParDoKVSideInput()},
 		{"cogbk:cogbk", primitives.CoGBK()},
 		{"flatten:flatten", primitives.Flatten()},
 		// {"flatten:dup", primitives.FlattenDup()},