You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/28 20:50:51 UTC
[beam] branch go-sdk updated: Add builtin varint coder
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/go-sdk by this push:
new e6f3af2 Add builtin varint coder
e6f3af2 is described below
commit e6f3af22f74bc37ee2e587f0399512ddf237d05a
Author: Henning Rohde <he...@google.com>
AuthorDate: Mon Dec 18 10:51:18 2017 -0800
Add builtin varint coder
---
sdks/go/pkg/beam/core/graph/coder/coder.go | 8 +++++++-
sdks/go/pkg/beam/core/runtime/exec/coder.go | 15 +++++++++++++++
sdks/go/pkg/beam/core/runtime/graphx/coder.go | 6 ++++++
sdks/go/pkg/beam/core/runtime/graphx/coder_test.go | 4 ++++
sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 7 +++++++
5 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 7286aed..1c46d7a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -122,7 +122,8 @@ type Kind string
// documents the usage of coders in the Beam environment.
const (
Custom Kind = "Custom" // Implicitly length-prefixed
- Bytes Kind = "bytes" // Implicitly length-prefixed
+ Bytes Kind = "bytes" // Implicitly length-prefixed as part of the encoding
+ VarInt Kind = "varint"
WindowedValue Kind = "W"
KV Kind = "KV"
GBK Kind = "GBK"
@@ -199,6 +200,11 @@ func NewBytes() *Coder {
return &Coder{Kind: Bytes, T: typex.New(reflectx.ByteSlice)}
}
+// NewVarInt returns a new int32 coder using the built-in scheme.
+func NewVarInt() *Coder {
+ return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+}
+
// Convenience methods to operate through the top-level WindowedValue.
// IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b162082..2253c36 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -78,6 +78,12 @@ func EncodeElement(c *coder.Coder, val FullValue, w io.Writer) error {
_, err := w.Write(data)
return err
+ case coder.VarInt:
+ // Encoding: beam varint
+
+ n := reflectx.UnderlyingType(val.Elm).Convert(reflectx.Int32).Interface().(int32)
+ return coder.EncodeVarInt(n, w)
+
case coder.Custom:
enc := c.Custom.Enc
@@ -135,6 +141,15 @@ func DecodeElement(c *coder.Coder, r io.Reader) (FullValue, error) {
}
return FullValue{Elm: reflect.ValueOf(data)}, nil
+ case coder.VarInt:
+ // Encoding: beam varint
+
+ n, err := coder.DecodeVarInt(r)
+ if err != nil {
+ return FullValue{}, err
+ }
+ return FullValue{Elm: reflect.ValueOf(n)}, nil
+
case coder.Custom:
dec := c.Custom.Dec
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index bb4b9b7..35301fe 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -136,6 +136,9 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
case urnBytesCoder:
return coder.NewBytes(), nil
+ case urnVarIntCoder:
+ return coder.NewVarInt(), nil
+
case urnKVCoder:
if len(components) != 2 {
return nil, fmt.Errorf("bad pair: %v", c)
@@ -298,6 +301,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
return b.internBuiltInCoder(urnBytesCoder)
+ case coder.VarInt:
+ return b.internBuiltInCoder(urnVarIntCoder)
+
default:
panic(fmt.Sprintf("Unexpected coder kind: %v", c.Kind))
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index 8669814..9ca52e7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -47,6 +47,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
coder.NewBytes(),
},
{
+ "varint",
+ coder.NewVarInt(),
+ },
+ {
"foo",
foo,
},
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 8d629b1..b732535 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -720,6 +720,7 @@ type CoderRef struct {
const (
WindowedValueType = "kind:windowed_value"
BytesType = "kind:bytes"
+ VarIntType = "kind:varint"
GlobalWindowType = "kind:global_window"
streamType = "kind:stream"
pairType = "kind:pair"
@@ -796,6 +797,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
// TODO(herohde) 6/27/2017: add length-prefix and not assume nested by context?
return &CoderRef{Type: BytesType}, nil
+ case coder.VarInt:
+ return &CoderRef{Type: VarIntType}, nil
+
default:
return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
}
@@ -807,6 +811,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
case BytesType:
return coder.NewBytes(), nil
+ case VarIntType:
+ return coder.NewVarInt(), nil
+
case pairType:
if len(c.Components) != 2 {
return nil, fmt.Errorf("bad pair: %+v", c)
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].