You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/03/22 16:52:54 UTC
[beam] branch master updated: [Go SDK] Use the known varint64 coder
for int64s.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9fea35 [Go SDK] Use the known varint64 coder for int64s.
new 3cbbe51 Merge pull request #8114 from lostluck/int64coder
b9fea35 is described below
commit b9fea3503bfc760626812038a74646b0751449b5
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Fri Mar 22 00:21:55 2019 +0000
[Go SDK] Use the known varint64 coder for int64s.
---
.../pipeline/src/main/proto/beam_runner_api.proto | 1 +
sdks/go/pkg/beam/coder.go | 5 ++-
sdks/go/pkg/beam/core/graph/coder/coder.go | 2 +-
sdks/go/pkg/beam/core/graph/coder/varint.go | 12 +++---
sdks/go/pkg/beam/core/graph/coder/varint_test.go | 44 +++++++++++++---------
sdks/go/pkg/beam/core/runtime/exec/coder.go | 10 ++---
6 files changed, 41 insertions(+), 33 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index e081f07..4e819ed 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -550,6 +550,7 @@ message StandardCoders {
// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
+ // Variable length Encodes a 64-bit integer.
// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 62672e5..58ed115 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -135,7 +135,10 @@ func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
- case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32, reflectx.Int64:
+ case reflectx.Int64:
+ // use the beam varint coder.
+ return &coder.Coder{Kind: coder.VarInt, T: t}, nil
+ case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
c, err := coderx.NewVarIntZ(t.Type())
if err != nil {
return nil, err
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index a28f8ca..89b5bd9 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -246,7 +246,7 @@ func NewBytes() *Coder {
// NewVarInt returns a new int32 coder using the built-in scheme.
func NewVarInt() *Coder {
- return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+ return &Coder{Kind: VarInt, T: typex.New(reflectx.Int64)}
}
// IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go
index d96c245..1c93d1d 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -85,16 +85,16 @@ func DecodeVarUint64(r io.Reader) (uint64, error) {
}
}
-// EncodeVarInt encodes an int32.
-func EncodeVarInt(value int32, w io.Writer) error {
- return EncodeVarUint64((uint64)(value)&0xffffffff, w)
+// EncodeVarInt encodes an int64.
+func EncodeVarInt(value int64, w io.Writer) error {
+ return EncodeVarUint64((uint64)(value), w)
}
-// DecodeVarInt decodes an int32.
-func DecodeVarInt(r io.Reader) (int32, error) {
+// DecodeVarInt decodes an int64.
+func DecodeVarInt(r io.Reader) (int64, error) {
ret, err := DecodeVarUint64(r)
if err != nil {
return 0, err
}
- return (int32)(ret), nil
+ return (int64)(ret), nil
}
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..13acf3a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,6 +17,8 @@ package coder
import (
"bytes"
+ "fmt"
+ "math"
"testing"
)
@@ -58,11 +60,15 @@ func TestEncodeDecodeVarUint64(t *testing.T) {
func TestEncodeDecodeVarInt(t *testing.T) {
tests := []struct {
- value int32
+ value int64
length int
}{
- {-2147483648, 5},
- {-1, 5},
+ {math.MinInt32, 10},
+ {math.MaxInt32, 5},
+ {math.MinInt64, 10},
+ {math.MinInt64 + 234, 10},
+ {math.MaxInt64, 9},
+ {-1, 10},
{0, 1},
{1, 1},
{127, 1},
@@ -71,24 +77,26 @@ func TestEncodeDecodeVarInt(t *testing.T) {
}
for _, test := range tests {
- var buf bytes.Buffer
+ t.Run(fmt.Sprintf("num:%d,len:%d", test.value, test.length), func(t *testing.T) {
+ var buf bytes.Buffer
- if err := EncodeVarInt(test.value, &buf); err != nil {
- t.Fatalf("EncodeVarInt(%v) failed: %v", test.value, err)
- }
+ if err := EncodeVarInt(test.value, &buf); err != nil {
+ t.Fatalf("EncodeVarInt(%v) failed: %v", test.value, err)
+ }
- t.Logf("Encoded %v to %v", test.value, buf.Bytes())
+ t.Logf("Encoded %v to %v", test.value, buf.Bytes())
- if len(buf.Bytes()) != test.length {
- t.Errorf("EncodeVarInt(%v) = %v, want %v", test.value, len(buf.Bytes()), test.length)
- }
+ if len(buf.Bytes()) != test.length {
+ t.Errorf("len(EncodeVarInt(%v)) = %v, want %v", test.value, len(buf.Bytes()), test.length)
+ }
- actual, err := DecodeVarInt(&buf)
- if err != nil {
- t.Fatalf("DecodeVarInt(<%v>) failed: %v", test.value, err)
- }
- if actual != test.value {
- t.Errorf("DecodeVarInt(<%v>) = %v, want %v", test.value, actual, test.value)
- }
+ actual, err := DecodeVarInt(&buf)
+ if err != nil {
+ t.Fatalf("DecodeVarInt(<%v>) failed: %v", test.value, err)
+ }
+ if actual != test.value {
+ t.Errorf("DecodeVarInt(<%v>) = %v, want %v", test.value, actual, test.value)
+ }
+ })
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 174e39e..dfe2ac1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -27,7 +27,6 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
- "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
// NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
@@ -124,7 +123,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) error {
}
size := len(data)
- if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+ if err := coder.EncodeVarInt((int64)(size), w); err != nil {
return err
}
_, err := w.Write(data)
@@ -151,16 +150,13 @@ type varIntEncoder struct{}
func (*varIntEncoder) Encode(val *FullValue, w io.Writer) error {
// Encoding: beam varint
-
- n := Convert(val.Elm, reflectx.Int32).(int32) // Convert needed?
- return coder.EncodeVarInt(n, w)
+ return coder.EncodeVarInt(val.Elm.(int64), w)
}
type varIntDecoder struct{}
func (*varIntDecoder) Decode(r io.Reader) (*FullValue, error) {
// Encoding: beam varint
-
n, err := coder.DecodeVarInt(r)
if err != nil {
return nil, err
@@ -184,7 +180,7 @@ func (c *customEncoder) Encode(val *FullValue, w io.Writer) error {
// (2) Add length prefix
size := len(data)
- if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+ if err := coder.EncodeVarInt((int64)(size), w); err != nil {
return err
}
_, err = w.Write(data)