You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/03/22 16:52:54 UTC

[beam] branch master updated: [Go SDK] Use the known varint64 coder for int64s.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9fea35  [Go SDK] Use the known varint64 coder for int64s.
     new 3cbbe51  Merge pull request #8114 from lostluck/int64coder
b9fea35 is described below

commit b9fea3503bfc760626812038a74646b0751449b5
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Fri Mar 22 00:21:55 2019 +0000

    [Go SDK] Use the known varint64 coder for int64s.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |  1 +
 sdks/go/pkg/beam/coder.go                          |  5 ++-
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  2 +-
 sdks/go/pkg/beam/core/graph/coder/varint.go        | 12 +++---
 sdks/go/pkg/beam/core/graph/coder/varint_test.go   | 44 +++++++++++++---------
 sdks/go/pkg/beam/core/runtime/exec/coder.go        | 10 ++---
 6 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index e081f07..4e819ed 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -550,6 +550,7 @@ message StandardCoders {
     // Components: The key and value coder, in that order.
     KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
 
+    // Variable length Encodes a 64-bit integer.
     // Components: None
     VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
 
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 62672e5..58ed115 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -135,7 +135,10 @@ func inferCoder(t FullType) (*coder.Coder, error) {
 	switch t.Class() {
 	case typex.Concrete, typex.Container:
 		switch t.Type() {
-		case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32, reflectx.Int64:
+		case reflectx.Int64:
+			// use the beam varint coder.
+			return &coder.Coder{Kind: coder.VarInt, T: t}, nil
+		case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
 			c, err := coderx.NewVarIntZ(t.Type())
 			if err != nil {
 				return nil, err
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index a28f8ca..89b5bd9 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -246,7 +246,7 @@ func NewBytes() *Coder {
 
 // NewVarInt returns a new int32 coder using the built-in scheme.
 func NewVarInt() *Coder {
-	return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+	return &Coder{Kind: VarInt, T: typex.New(reflectx.Int64)}
 }
 
 // IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go
index d96c245..1c93d1d 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -85,16 +85,16 @@ func DecodeVarUint64(r io.Reader) (uint64, error) {
 	}
 }
 
-// EncodeVarInt encodes an int32.
-func EncodeVarInt(value int32, w io.Writer) error {
-	return EncodeVarUint64((uint64)(value)&0xffffffff, w)
+// EncodeVarInt encodes an int64.
+func EncodeVarInt(value int64, w io.Writer) error {
+	return EncodeVarUint64((uint64)(value), w)
 }
 
-// DecodeVarInt decodes an int32.
-func DecodeVarInt(r io.Reader) (int32, error) {
+// DecodeVarInt decodes an int64.
+func DecodeVarInt(r io.Reader) (int64, error) {
 	ret, err := DecodeVarUint64(r)
 	if err != nil {
 		return 0, err
 	}
-	return (int32)(ret), nil
+	return (int64)(ret), nil
 }
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..13acf3a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,6 +17,8 @@ package coder
 
 import (
 	"bytes"
+	"fmt"
+	"math"
 	"testing"
 )
 
@@ -58,11 +60,15 @@ func TestEncodeDecodeVarUint64(t *testing.T) {
 
 func TestEncodeDecodeVarInt(t *testing.T) {
 	tests := []struct {
-		value  int32
+		value  int64
 		length int
 	}{
-		{-2147483648, 5},
-		{-1, 5},
+		{math.MinInt32, 10},
+		{math.MaxInt32, 5},
+		{math.MinInt64, 10},
+		{math.MinInt64 + 234, 10},
+		{math.MaxInt64, 9},
+		{-1, 10},
 		{0, 1},
 		{1, 1},
 		{127, 1},
@@ -71,24 +77,26 @@ func TestEncodeDecodeVarInt(t *testing.T) {
 	}
 
 	for _, test := range tests {
-		var buf bytes.Buffer
+		t.Run(fmt.Sprintf("num:%d,len:%d", test.value, test.length), func(t *testing.T) {
+			var buf bytes.Buffer
 
-		if err := EncodeVarInt(test.value, &buf); err != nil {
-			t.Fatalf("EncodeVarInt(%v) failed: %v", test.value, err)
-		}
+			if err := EncodeVarInt(test.value, &buf); err != nil {
+				t.Fatalf("EncodeVarInt(%v) failed: %v", test.value, err)
+			}
 
-		t.Logf("Encoded %v to %v", test.value, buf.Bytes())
+			t.Logf("Encoded %v to %v", test.value, buf.Bytes())
 
-		if len(buf.Bytes()) != test.length {
-			t.Errorf("EncodeVarInt(%v) = %v, want %v", test.value, len(buf.Bytes()), test.length)
-		}
+			if len(buf.Bytes()) != test.length {
+				t.Errorf("len(EncodeVarInt(%v)) = %v, want %v", test.value, len(buf.Bytes()), test.length)
+			}
 
-		actual, err := DecodeVarInt(&buf)
-		if err != nil {
-			t.Fatalf("DecodeVarInt(<%v>) failed: %v", test.value, err)
-		}
-		if actual != test.value {
-			t.Errorf("DecodeVarInt(<%v>) = %v, want %v", test.value, actual, test.value)
-		}
+			actual, err := DecodeVarInt(&buf)
+			if err != nil {
+				t.Fatalf("DecodeVarInt(<%v>) failed: %v", test.value, err)
+			}
+			if actual != test.value {
+				t.Errorf("DecodeVarInt(<%v>) = %v, want %v", test.value, actual, test.value)
+			}
+		})
 	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 174e39e..dfe2ac1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -27,7 +27,6 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
@@ -124,7 +123,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) error {
 	}
 	size := len(data)
 
-	if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+	if err := coder.EncodeVarInt((int64)(size), w); err != nil {
 		return err
 	}
 	_, err := w.Write(data)
@@ -151,16 +150,13 @@ type varIntEncoder struct{}
 
 func (*varIntEncoder) Encode(val *FullValue, w io.Writer) error {
 	// Encoding: beam varint
-
-	n := Convert(val.Elm, reflectx.Int32).(int32) // Convert needed?
-	return coder.EncodeVarInt(n, w)
+	return coder.EncodeVarInt(val.Elm.(int64), w)
 }
 
 type varIntDecoder struct{}
 
 func (*varIntDecoder) Decode(r io.Reader) (*FullValue, error) {
 	// Encoding: beam varint
-
 	n, err := coder.DecodeVarInt(r)
 	if err != nil {
 		return nil, err
@@ -184,7 +180,7 @@ func (c *customEncoder) Encode(val *FullValue, w io.Writer) error {
 	// (2) Add length prefix
 
 	size := len(data)
-	if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+	if err := coder.EncodeVarInt((int64)(size), w); err != nil {
 		return err
 	}
 	_, err = w.Write(data)