You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2022/08/12 15:18:20 UTC
[beam] branch master updated: [Go SDK]: Implement standalone single-precision float encoder (#22664)
This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9bb76fe9f [Go SDK]: Implement standalone single-precision float encoder (#22664)
7a9bb76fe9f is described below
commit 7a9bb76fe9f4c167c1d125db9d2cff9a1a315149
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Fri Aug 12 11:18:12 2022 -0400
[Go SDK]: Implement standalone single-precision float encoder (#22664)
* Implement standalone single-precision float encoder
* Add fuzz test
* Fix buffer size
* Fix docstring
* Add CHANGES.md entry
---
CHANGES.md | 1 +
.../pkg/beam/core/graph/coder/coder_fuzz_test.go | 19 ++++++++
sdks/go/pkg/beam/core/graph/coder/float.go | 41 ++++++++++++++++++
sdks/go/pkg/beam/core/graph/coder/float_test.go | 50 ++++++++++++++++++++++
sdks/go/pkg/beam/core/graph/coder/row_decoder.go | 13 +++++-
sdks/go/pkg/beam/core/graph/coder/row_encoder.go | 6 ++-
6 files changed, 128 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 82519483b7f..b8775c11742 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
## Breaking Changes
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* The Go SDK's Row Coder now uses a different single-precision float encoding for float32 types to match Java's behavior ([#22629](https://github.com/apache/beam/issues/22629)).
## Deprecations
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
index f0436cd5506..f18fa43f5a4 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
@@ -65,6 +65,25 @@ func FuzzEncodeDecodeDouble(f *testing.F) {
})
}
+func FuzzEncodeDecodeSinglePrecisionFloat(f *testing.F) {
+ f.Add(float32(3.141))
+ f.Fuzz(func(t *testing.T, a float32) {
+ var buf bytes.Buffer
+ err := EncodeSinglePrecisionFloat(a, &buf)
+ if err != nil {
+ return
+ }
+
+ actual, err := DecodeSinglePrecisionFloat(&buf)
+ if err != nil {
+ t.Fatalf("DecodeDouble(%v) failed: %v", &buf, err)
+ }
+ if math.Abs(float64(actual-a)) > floatPrecision {
+ t.Fatalf("got %f, want %f +/- %f", actual, a, floatPrecision)
+ }
+ })
+}
+
func FuzzEncodeDecodeUInt64(f *testing.F) {
f.Add(uint64(42))
f.Fuzz(func(t *testing.T, b uint64) {
diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go
new file mode 100644
index 00000000000..4a311f5cf72
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/float.go
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "encoding/binary"
+ "io"
+ "math"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodeSinglePrecisionFloat encodes a float32 in big endian format.
+func EncodeSinglePrecisionFloat(value float32, w io.Writer) error {
+ var data [4]byte
+ binary.BigEndian.PutUint32(data[:], math.Float32bits(value))
+ _, err := ioutilx.WriteUnsafe(w, data[:])
+ return err
+}
+
+// DecodeSinglePrecisionFloat decodes a float32 in big endian format.
+func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) {
+ var data [4]byte
+ if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil {
+ return 0, err
+ }
+ return math.Float32frombits(binary.BigEndian.Uint32(data[:])), nil
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/float_test.go b/sdks/go/pkg/beam/core/graph/coder/float_test.go
new file mode 100644
index 00000000000..645ea51e67d
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/float_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "bytes"
+ "math"
+ "testing"
+)
+
+func TestEncodeDecodeSinglePrecisionFloat(t *testing.T) {
+ var tests []float32
+ for x := float32(-100.0); x <= float32(100.0); x++ {
+ tests = append(tests, 0.1*x)
+ }
+ tests = append(tests, -math.MaxFloat32)
+ tests = append(tests, math.MaxFloat32)
+ for _, test := range tests {
+ var buf bytes.Buffer
+ if err := EncodeSinglePrecisionFloat(test, &buf); err != nil {
+ t.Fatalf("EncodeSinglePrecisionFloat(%v) failed: %v", test, err)
+ }
+ t.Logf("Encoded %v to %v", test, buf.Bytes())
+
+ if len(buf.Bytes()) != 4 {
+ t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 4", test, len(buf.Bytes()))
+ }
+
+ actual, err := DecodeSinglePrecisionFloat(&buf)
+ if err != nil {
+ t.Fatalf("DecodeSinglePrecisionFloat(<%v>) failed: %v", test, err)
+ }
+ if actual != test {
+ t.Errorf("DecodeSinglePrecisionFloat(<%v>) = %v, want %v", test, actual, test)
+ }
+ }
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
index 9688ed9876c..689f687af14 100644
--- a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
@@ -241,6 +241,15 @@ func reflectDecodeUint(rv reflect.Value, r io.Reader) error {
return nil
}
+func reflectDecodeSinglePrecisionFloat(rv reflect.Value, r io.Reader) error {
+ v, err := DecodeSinglePrecisionFloat(r)
+ if err != nil {
+ return errors.Wrap(err, "error decoding single-precision float field")
+ }
+ rv.SetFloat(float64(v))
+ return nil
+}
+
func reflectDecodeFloat(rv reflect.Value, r io.Reader) error {
v, err := DecodeDouble(r)
if err != nil {
@@ -336,7 +345,9 @@ func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type) (typeDec
return typeDecoderFieldReflect{decode: reflectDecodeInt}, nil
case reflect.Uint, reflect.Uint64, reflect.Uint32, reflect.Uint16:
return typeDecoderFieldReflect{decode: reflectDecodeUint}, nil
- case reflect.Float32, reflect.Float64:
+ case reflect.Float32:
+ return typeDecoderFieldReflect{decode: reflectDecodeSinglePrecisionFloat}, nil
+ case reflect.Float64:
return typeDecoderFieldReflect{decode: reflectDecodeFloat}, nil
case reflect.Ptr:
decf, err := b.decoderForSingleTypeReflect(t.Elem())
diff --git a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
index cfc1a8e51a3..dc41890b004 100644
--- a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
@@ -208,7 +208,11 @@ func (b *RowEncoderBuilder) encoderForSingleTypeReflect(t reflect.Type) (typeEnc
return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
return EncodeVarUint64(rv.Uint(), w)
}}, nil
- case reflect.Float32, reflect.Float64:
+ case reflect.Float32:
+ return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
+ return EncodeSinglePrecisionFloat(float32(rv.Float()), w)
+ }}, nil
+ case reflect.Float64:
return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
return EncodeDouble(rv.Float(), w)
}}, nil