You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/22 22:29:28 UTC
[beam] branch master updated: [Go SDK] Fix side input window coding,
& re-enable tests.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b8159f0 [Go SDK] Fix side input window coding, & re-enable tests.
new fa3af06 Merge pull request #7591 from lostluck/singlewindow
b8159f0 is described below
commit b8159f0e803be6866bd46753df82d4282cdab3fb
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Tue Jan 22 22:04:43 2019 +0000
[Go SDK] Fix side input window coding, & re-enable tests.
---
sdks/go/pkg/beam/core/runtime/exec/coder.go | 33 +++++++++++++++++++----------
sdks/go/test/integration/driver.go | 4 ++--
2 files changed, 24 insertions(+), 13 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b3a1491..012a77f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -250,13 +250,14 @@ func (c *kvDecoder) Decode(r io.Reader) (FullValue, error) {
type WindowEncoder interface {
// Encode serializes the given value to the writer.
Encode([]typex.Window, io.Writer) error
+ EncodeSingle(typex.Window, io.Writer) error
}
// EncodeWindow is a convenience function for encoding a single window into a
// byte slice.
func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error) {
var buf bytes.Buffer
- if err := c.Encode([]typex.Window{w}, &buf); err != nil {
+ if err := c.EncodeSingle(w, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
@@ -304,6 +305,10 @@ func (*globalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
return coder.EncodeInt32(1, w) // #windows
}
+func (*globalWindowEncoder) EncodeSingle(ws typex.Window, w io.Writer) error {
+ return nil
+}
+
type globalWindowDecoder struct{}
func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
@@ -313,25 +318,31 @@ func (*globalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
type intervalWindowEncoder struct{}
-func (*intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
- // Encoding: upper bound and duration
-
+func (enc *intervalWindowEncoder) Encode(ws []typex.Window, w io.Writer) error {
if err := coder.EncodeInt32(int32(len(ws)), w); err != nil { // #windows
return err
}
for _, elm := range ws {
- iw := elm.(window.IntervalWindow)
- if err := coder.EncodeEventTime(iw.End, w); err != nil {
- return err
- }
- duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
- if err := coder.EncodeVarUint64(uint64(duration), w); err != nil {
- return err
+ if err := enc.EncodeSingle(elm, w); err != nil {
+ return nil
}
}
return nil
}
+func (*intervalWindowEncoder) EncodeSingle(elm typex.Window, w io.Writer) error {
+ // Encoding: upper bound and duration
+ iw := elm.(window.IntervalWindow)
+ if err := coder.EncodeEventTime(iw.End, w); err != nil {
+ return err
+ }
+ duration := iw.End.Milliseconds() - iw.Start.Milliseconds()
+ if err := coder.EncodeVarUint64(uint64(duration), w); err != nil {
+ return err
+ }
+ return nil
+}
+
type intervalWindowDecoder struct{}
func (*intervalWindowDecoder) Decode(r io.Reader) ([]typex.Window, error) {
diff --git a/sdks/go/test/integration/driver.go b/sdks/go/test/integration/driver.go
index 8daac8e..92a5529 100644
--- a/sdks/go/test/integration/driver.go
+++ b/sdks/go/test/integration/driver.go
@@ -59,8 +59,8 @@ func main() {
{"wordcount:memfs", wordcount.WordCount(old_pond, "+Qj8iAnV5BI2A4sbzUbb6Q==", 8)},
{"wordcount:kinglear", wordcount.WordCount("gs://apache-beam-samples/shakespeare/kinglear.txt", "7ZCh5ih9m8IW1w+iS8sRKg==", 4749)},
{"pardo:multioutput", primitives.ParDoMultiOutput()},
- // BEAM-3286: {"pardo:sideinput", primitives.ParDoSideInput()},
- // BEAM-3286: {"pardo:kvsideinput", primitives.ParDoKVSideInput()},
+ {"pardo:sideinput", primitives.ParDoSideInput()},
+ {"pardo:kvsideinput", primitives.ParDoKVSideInput()},
{"cogbk:cogbk", primitives.CoGBK()},
{"flatten:flatten", primitives.Flatten()},
// {"flatten:dup", primitives.FlattenDup()},