You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/18 23:37:55 UTC

[beam] branch go-sdk updated: [BEAM-3356] Add Go SDK int and varint custom coders (#4276)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/go-sdk by this push:
     new 14eb2cd  [BEAM-3356] Add Go SDK int and varint custom coders (#4276)
14eb2cd is described below

commit 14eb2cddcd11c73f79555bc8ddbc121e29b10c4d
Author: Henning Rohde <he...@seekerror.org>
AuthorDate: Mon Dec 18 15:37:50 2017 -0800

    [BEAM-3356] Add Go SDK int and varint custom coders (#4276)
    
    * Add Go SDK int and varint custom coders
    
    * CR: [BEAM-3356] Add Go SDK int and varint custom coders
---
 sdks/go/pkg/beam/coder.go                        | 29 +++++++----
 sdks/go/pkg/beam/core/graph/coder/int.go         | 66 ++++++++++++++++++++++++
 sdks/go/pkg/beam/core/graph/coder/varint.go      | 66 ++++++++++++++++++++++--
 sdks/go/pkg/beam/core/graph/coder/varint_test.go | 64 +++++++++++++++++++++++
 4 files changed, 211 insertions(+), 14 deletions(-)

diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 9405573..b81152a 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -85,18 +85,29 @@ func inferCoder(t FullType) (*coder.Coder, error) {
 	switch t.Class() {
 	case typex.Concrete, typex.Container:
 		switch t.Type() {
-		// The type conversions here are very conservative. We handle bytes/strings
-		// equivalently because they are essentially equivalent in the language.
-		// Notably, we do not (currently) support equivalences in numeric data types
-		// due to risks around inadvertent widening or narrowing of data.
+		case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32, reflectx.Int64:
+			c, err := coder.NewVarIntZ(t.Type())
+			if err != nil {
+				return nil, err
+			}
+			return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
+		case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
+			c, err := coder.NewVarUintZ(t.Type())
+			if err != nil {
+				return nil, err
+			}
+			return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
 		case reflectx.String, reflectx.ByteSlice:
+			// We handle bytes/strings equivalently because they are essentially
+			// equivalent in the language. We generally infer exact coders only.
 			return &coder.Coder{Kind: coder.Bytes}, nil
+		default:
+			c, err := newJSONCoder(t.Type())
+			if err != nil {
+				return nil, err
+			}
+			return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
 		}
-		c, err := newJSONCoder(t.Type())
-		if err != nil {
-			return nil, err
-		}
-		return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
 
 	case typex.Composite:
 		c, err := inferCoders(t.Components())
diff --git a/sdks/go/pkg/beam/core/graph/coder/int.go b/sdks/go/pkg/beam/core/graph/coder/int.go
index 336e32b..77f2814 100644
--- a/sdks/go/pkg/beam/core/graph/coder/int.go
+++ b/sdks/go/pkg/beam/core/graph/coder/int.go
@@ -20,8 +20,74 @@ import (
 	"io"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
+var (
+	// Fixed-sized custom coders for integers.
+
+	Uint32 *CustomCoder
+	Int32  *CustomCoder
+	Uint64 *CustomCoder
+	Int64  *CustomCoder
+)
+
+func init() {
+	var err error
+	Uint32, err = NewCustomCoder("uint32", reflectx.Uint32, encUint32, decUint32)
+	if err != nil {
+		panic(err)
+	}
+	Int32, err = NewCustomCoder("int32", reflectx.Int32, encInt32, decInt32)
+	if err != nil {
+		panic(err)
+	}
+	Uint64, err = NewCustomCoder("uint64", reflectx.Uint64, encUint64, decUint64)
+	if err != nil {
+		panic(err)
+	}
+	Int64, err = NewCustomCoder("int64", reflectx.Int64, encInt64, decInt64)
+	if err != nil {
+		panic(err)
+	}
+}
+
+func encUint32(v uint32) []byte {
+	ret := make([]byte, 4)
+	binary.BigEndian.PutUint32(ret, v)
+	return ret
+}
+
+func decUint32(data []byte) uint32 {
+	return binary.BigEndian.Uint32(data)
+}
+
+func encInt32(v int32) []byte {
+	return encUint32(uint32(v))
+}
+
+func decInt32(data []byte) int32 {
+	return int32(decUint32(data))
+}
+
+func encUint64(v uint64) []byte {
+	ret := make([]byte, 8)
+	binary.BigEndian.PutUint64(ret, v)
+	return ret
+}
+
+func decUint64(data []byte) uint64 {
+	return binary.BigEndian.Uint64(data)
+}
+
+func encInt64(v int64) []byte {
+	return encUint64(uint64(v))
+}
+
+func decInt64(data []byte) int64 {
+	return int64(decUint64(data))
+}
+
 // EncodeUint64 encodes an uint64 in big endian format.
 func EncodeUint64(value uint64, w io.Writer) error {
 	ret := make([]byte, 8)
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 511d176..1eed24f 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,8 +16,14 @@
 package coder
 
 import (
+	"encoding/binary"
 	"errors"
+	"fmt"
 	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 // ErrVarIntTooLong indicates a data corruption issue that needs special
@@ -25,11 +31,55 @@ import (
 // this special handling.
 var ErrVarIntTooLong = errors.New("varint too long")
 
-// Variable-length encoding for integers.
-//
-// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
-// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
-// uint64 (over int64) as the primitive form to get logical bit shifts.
+// NewVarIntZ returns a varint coder for the given integer type. It uses a zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarIntZ(t reflect.Type) (*CustomCoder, error) {
+	switch t.Kind() {
+	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+		return NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ)
+	default:
+		return nil, fmt.Errorf("not a signed integer type: %v", t)
+	}
+}
+
+// NewVarUintZ returns a uvarint coder for the given integer type. It uses a zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarUintZ(t reflect.Type) (*CustomCoder, error) {
+	switch t.Kind() {
+	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
+		return NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ)
+	default:
+		return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+	}
+}
+
+func encVarIntZ(v typex.T) []byte {
+	ret := make([]byte, binary.MaxVarintLen64)
+	size := binary.PutVarint(ret, reflect.ValueOf(v).Convert(reflectx.Int64).Interface().(int64))
+	return ret[:size]
+}
+
+func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
+	n, size := binary.Varint(data)
+	if size <= 0 {
+		return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+	}
+	return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
+
+func encVarUintZ(v typex.T) []byte {
+	ret := make([]byte, binary.MaxVarintLen64)
+	size := binary.PutUvarint(ret, reflect.ValueOf(v).Convert(reflectx.Uint64).Interface().(uint64))
+	return ret[:size]
+}
+
+func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
+	n, size := binary.Uvarint(data)
+	if size <= 0 {
+		return nil, fmt.Errorf("invalid varuintz encoding for: %v", data)
+	}
+	return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
 
 // EncodeVarUint64 encodes an uint64.
 func EncodeVarUint64(value uint64, w io.Writer) error {
@@ -51,6 +101,12 @@ func EncodeVarUint64(value uint64, w io.Writer) error {
 	}
 }
 
+// Variable-length encoding for integers.
+//
+// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
+// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
+// uint64 (over int64) as the primitive form to get logical bit shifts.
+
 // TODO(herohde) 5/16/2017: figure out whether it's too slow to read one byte
 // at a time here. If not, we may need a more sophisticated reader than
 // io.Reader with lookahead, say.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..0460880 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,7 +17,10 @@ package coder
 
 import (
 	"bytes"
+	"reflect"
 	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 func TestEncodeDecodeVarUint64(t *testing.T) {
@@ -92,3 +95,64 @@ func TestEncodeDecodeVarInt(t *testing.T) {
 		}
 	}
 }
+
+func TestVarIntZ(t *testing.T) {
+	tests := []interface{}{
+		int(1),
+		int(-1),
+		int8(8),
+		int8(-8),
+		int16(16),
+		int16(-16),
+		int32(32),
+		int32(-32),
+		int64(64),
+		int64(-64),
+	}
+
+	for _, v := range tests {
+		typ := reflect.ValueOf(v).Type()
+
+		data := encVarIntZ(v)
+		result, err := decVarIntZ(typ, data)
+		if err != nil {
+			t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+		}
+
+		if v != result {
+			t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+		}
+		resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+		if resultT != typ {
+			t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT)
+		}
+	}
+}
+
+func TestVarUintZ(t *testing.T) {
+	tests := []interface{}{
+		uint(1),
+		uint8(8),
+		uint16(16),
+		uint32(32),
+		uint64(64),
+	}
+
+	for _, v := range tests {
+		typ := reflect.ValueOf(v).Type()
+
+		data := encVarUintZ(v)
+		result, err := decVarUintZ(typ, data)
+		if err != nil {
+			t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+		}
+
+		if v != result {
+			t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+		}
+		resultT := reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+		if resultT != typ {
+			t.Errorf("type(dec(enc(%v))) = %v, want id", typ, resultT)
+		}
+	}
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].