You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/28 20:50:51 UTC

[beam] branch go-sdk updated: Add builtin varint coder

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/go-sdk by this push:
     new e6f3af2  Add builtin varint coder
e6f3af2 is described below

commit e6f3af22f74bc37ee2e587f0399512ddf237d05a
Author: Henning Rohde <he...@google.com>
AuthorDate: Mon Dec 18 10:51:18 2017 -0800

    Add builtin varint coder
---
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  8 +++++++-
 sdks/go/pkg/beam/core/runtime/exec/coder.go        | 15 +++++++++++++++
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  6 ++++++
 sdks/go/pkg/beam/core/runtime/graphx/coder_test.go |  4 ++++
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  |  7 +++++++
 5 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 7286aed..1c46d7a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -122,7 +122,8 @@ type Kind string
 // documents the usage of coders in the Beam environment.
 const (
 	Custom        Kind = "Custom" // Implicitly length-prefixed
-	Bytes         Kind = "bytes"  // Implicitly length-prefixed
+	Bytes         Kind = "bytes"  // Implicitly length-prefixed as part of the encoding
+	VarInt        Kind = "varint"
 	WindowedValue Kind = "W"
 	KV            Kind = "KV"
 	GBK           Kind = "GBK"
@@ -199,6 +200,11 @@ func NewBytes() *Coder {
 	return &Coder{Kind: Bytes, T: typex.New(reflectx.ByteSlice)}
 }
 
+// NewVarInt returns a new int32 coder using the built-in scheme.
+func NewVarInt() *Coder {
+	return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+}
+
 // Convenience methods to operate through the top-level WindowedValue.
 
 // IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b162082..2253c36 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -78,6 +78,12 @@ func EncodeElement(c *coder.Coder, val FullValue, w io.Writer) error {
 		_, err := w.Write(data)
 		return err
 
+	case coder.VarInt:
+		// Encoding: beam varint
+
+		n := reflectx.UnderlyingType(val.Elm).Convert(reflectx.Int32).Interface().(int32)
+		return coder.EncodeVarInt(n, w)
+
 	case coder.Custom:
 		enc := c.Custom.Enc
 
@@ -135,6 +141,15 @@ func DecodeElement(c *coder.Coder, r io.Reader) (FullValue, error) {
 		}
 		return FullValue{Elm: reflect.ValueOf(data)}, nil
 
+	case coder.VarInt:
+		// Encoding: beam varint
+
+		n, err := coder.DecodeVarInt(r)
+		if err != nil {
+			return FullValue{}, err
+		}
+		return FullValue{Elm: reflect.ValueOf(n)}, nil
+
 	case coder.Custom:
 		dec := c.Custom.Dec
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index bb4b9b7..35301fe 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -136,6 +136,9 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 	case urnBytesCoder:
 		return coder.NewBytes(), nil
 
+	case urnVarIntCoder:
+		return coder.NewVarInt(), nil
+
 	case urnKVCoder:
 		if len(components) != 2 {
 			return nil, fmt.Errorf("bad pair: %v", c)
@@ -298,6 +301,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 		// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
 		return b.internBuiltInCoder(urnBytesCoder)
 
+	case coder.VarInt:
+		return b.internBuiltInCoder(urnVarIntCoder)
+
 	default:
 		panic(fmt.Sprintf("Unexpected coder kind: %v", c.Kind))
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index 8669814..9ca52e7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -47,6 +47,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
 			coder.NewBytes(),
 		},
 		{
+			"varint",
+			coder.NewVarInt(),
+		},
+		{
 			"foo",
 			foo,
 		},
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 8d629b1..b732535 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -720,6 +720,7 @@ type CoderRef struct {
 const (
 	WindowedValueType = "kind:windowed_value"
 	BytesType         = "kind:bytes"
+	VarIntType        = "kind:varint"
 	GlobalWindowType  = "kind:global_window"
 	streamType        = "kind:stream"
 	pairType          = "kind:pair"
@@ -796,6 +797,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 		// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
 		return &CoderRef{Type: BytesType}, nil
 
+	case coder.VarInt:
+		return &CoderRef{Type: VarIntType}, nil
+
 	default:
 		return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
 	}
@@ -807,6 +811,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 	case BytesType:
 		return coder.NewBytes(), nil
 
+	case VarIntType:
+		return coder.NewVarInt(), nil
+
 	case pairType:
 		if len(c.Components) != 2 {
 			return nil, fmt.Errorf("bad pair: %+v", c)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].