You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/18 23:37:55 UTC
[beam] branch go-sdk updated: [BEAM-3356] Add Go SDK int and varint
custom coders (#4276)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/go-sdk by this push:
new 14eb2cd [BEAM-3356] Add Go SDK int and varint custom coders (#4276)
14eb2cd is described below
commit 14eb2cddcd11c73f79555bc8ddbc121e29b10c4d
Author: Henning Rohde <he...@seekerror.org>
AuthorDate: Mon Dec 18 15:37:50 2017 -0800
[BEAM-3356] Add Go SDK int and varint custom coders (#4276)
* Add Go SDK int and varint custom coders
* CR: [BEAM-3356] Add Go SDK int and varint custom coders
---
sdks/go/pkg/beam/coder.go | 29 +++++++----
sdks/go/pkg/beam/core/graph/coder/int.go | 66 ++++++++++++++++++++++++
sdks/go/pkg/beam/core/graph/coder/varint.go | 66 ++++++++++++++++++++++--
sdks/go/pkg/beam/core/graph/coder/varint_test.go | 64 +++++++++++++++++++++++
4 files changed, 211 insertions(+), 14 deletions(-)
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 9405573..b81152a 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -85,18 +85,29 @@ func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
- // The type conversions here are very conservative. We handle bytes/strings
- // equivalently because they are essentially equivalent in the language.
- // Notably, we do not (currently) support equivalences in numeric data types
- // due to risks around inadvertent widening or narrowing of data.
+ case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32, reflectx.Int64:
+ c, err := coder.NewVarIntZ(t.Type())
+ if err != nil {
+ return nil, err
+ }
+ return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
+ case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
+ c, err := coder.NewVarUintZ(t.Type())
+ if err != nil {
+ return nil, err
+ }
+ return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case reflectx.String, reflectx.ByteSlice:
+ // We handle bytes/strings equivalently because they are essentially
+ // equivalent in the language. We generally infer exact coders only.
return &coder.Coder{Kind: coder.Bytes}, nil
+ default:
+ c, err := newJSONCoder(t.Type())
+ if err != nil {
+ return nil, err
+ }
+ return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
}
- c, err := newJSONCoder(t.Type())
- if err != nil {
- return nil, err
- }
- return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
case typex.Composite:
c, err := inferCoders(t.Components())
diff --git a/sdks/go/pkg/beam/core/graph/coder/int.go b/sdks/go/pkg/beam/core/graph/coder/int.go
index 336e32b..77f2814 100644
--- a/sdks/go/pkg/beam/core/graph/coder/int.go
+++ b/sdks/go/pkg/beam/core/graph/coder/int.go
@@ -20,8 +20,74 @@ import (
"io"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
+var (
+ // Fixed-sized custom coders for integers.
+
+ Uint32 *CustomCoder
+ Int32 *CustomCoder
+ Uint64 *CustomCoder
+ Int64 *CustomCoder
+)
+
+func init() {
+ var err error
+ Uint32, err = NewCustomCoder("uint32", reflectx.Uint32, encUint32, decUint32)
+ if err != nil {
+ panic(err)
+ }
+ Int32, err = NewCustomCoder("int32", reflectx.Int32, encInt32, decInt32)
+ if err != nil {
+ panic(err)
+ }
+ Uint64, err = NewCustomCoder("uint64", reflectx.Uint64, encUint64, decUint64)
+ if err != nil {
+ panic(err)
+ }
+ Int64, err = NewCustomCoder("int64", reflectx.Int64, encInt64, decInt64)
+ if err != nil {
+ panic(err)
+ }
+}
+
+func encUint32(v uint32) []byte {
+ ret := make([]byte, 4)
+ binary.BigEndian.PutUint32(ret, v)
+ return ret
+}
+
+func decUint32(data []byte) uint32 {
+ return binary.BigEndian.Uint32(data)
+}
+
+func encInt32(v int32) []byte {
+ return encUint32(uint32(v))
+}
+
+func decInt32(data []byte) int32 {
+ return int32(decUint32(data))
+}
+
+func encUint64(v uint64) []byte {
+ ret := make([]byte, 8)
+ binary.BigEndian.PutUint64(ret, v)
+ return ret
+}
+
+func decUint64(data []byte) uint64 {
+ return binary.BigEndian.Uint64(data)
+}
+
+func encInt64(v int64) []byte {
+ return encUint64(uint64(v))
+}
+
+func decInt64(data []byte) int64 {
+ return int64(decUint64(data))
+}
+
// EncodeUint64 encodes an uint64 in big endian format.
func EncodeUint64(value uint64, w io.Writer) error {
ret := make([]byte, 8)
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 511d176..1eed24f 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,8 +16,14 @@
package coder
import (
+ "encoding/binary"
"errors"
+ "fmt"
"io"
+ "reflect"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
// ErrVarIntTooLong indicates a data corruption issue that needs special
@@ -25,11 +31,55 @@ import (
// this special handling.
var ErrVarIntTooLong = errors.New("varint too long")
-// Variable-length encoding for integers.
-//
-// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
-// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
-// uint64 (over int64) as the primitive form to get logical bit shifts.
+// NewVarIntZ returns a varint coder for the given integer type. It uses a zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarIntZ(t reflect.Type) (*CustomCoder, error) {
+ switch t.Kind() {
+ case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+ return NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ)
+ default:
+ return nil, fmt.Errorf("not a signed integer type: %v", t)
+ }
+}
+
+// NewVarUintZ returns a uvarint coder for the given integer type. It uses a zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarUintZ(t reflect.Type) (*CustomCoder, error) {
+ switch t.Kind() {
+ case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
+ return NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ)
+ default:
+ return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+ }
+}
+
+func encVarIntZ(v typex.T) []byte {
+ ret := make([]byte, binary.MaxVarintLen64)
+ size := binary.PutVarint(ret, reflect.ValueOf(v).Convert(reflectx.Int64).Interface().(int64))
+ return ret[:size]
+}
+
+func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
+ n, size := binary.Varint(data)
+ if size <= 0 {
+ return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+ }
+ return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
+
+func encVarUintZ(v typex.T) []byte {
+ ret := make([]byte, binary.MaxVarintLen64)
+ size := binary.PutUvarint(ret, reflect.ValueOf(v).Convert(reflectx.Uint64).Interface().(uint64))
+ return ret[:size]
+}
+
+func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
+ n, size := binary.Uvarint(data)
+ if size <= 0 {
+ return nil, fmt.Errorf("invalid varuintz encoding for: %v", data)
+ }
+ return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
// EncodeVarUint64 encodes an uint64.
func EncodeVarUint64(value uint64, w io.Writer) error {
@@ -51,6 +101,12 @@ func EncodeVarUint64(value uint64, w io.Writer) error {
}
}
+// Variable-length encoding for integers.
+//
+// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
+// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
+// uint64 (over int64) as the primitive form to get logical bit shifts.
+
// TODO(herohde) 5/16/2017: figure out whether it's too slow to read one byte
// at a time here. If not, we may need a more sophisticated reader than
// io.Reader with lookahead, say.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..0460880 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,7 +17,10 @@ package coder
import (
"bytes"
+ "reflect"
"testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
)
func TestEncodeDecodeVarUint64(t *testing.T) {
@@ -92,3 +95,64 @@ func TestEncodeDecodeVarInt(t *testing.T) {
}
}
}
+
+func TestVarIntZ(t *testing.T) {
+ tests := []interface{}{
+ int(1),
+ int(-1),
+ int8(8),
+ int8(-8),
+ int16(16),
+ int16(-16),
+ int32(32),
+ int32(-32),
+ int64(64),
+ int64(-64),
+ }
+
+ for _, v := range tests {
+ typ := reflect.ValueOf(v).Type()
+
+ data := encVarIntZ(v)
+ result, err := decVarIntZ(typ, data)
+ if err != nil {
+ t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+ }
+
+ if v != result {
+ t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+ }
+ resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+ if resultT != typ {
+ t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT)
+ }
+ }
+}
+
+func TestVarUintZ(t *testing.T) {
+ tests := []interface{}{
+ uint(1),
+ uint8(8),
+ uint16(16),
+ uint32(32),
+ uint64(64),
+ }
+
+ for _, v := range tests {
+ typ := reflect.ValueOf(v).Type()
+
+ data := encVarUintZ(v)
+ result, err := decVarUintZ(typ, data)
+ if err != nil {
+ t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+ }
+
+ if v != result {
+ t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+ }
+ resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+ if resultT != typ {
+ t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT)
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].