You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2022/08/12 15:18:20 UTC

[beam] branch master updated: [Go SDK]: Implement standalone single-precision float encoder (#22664)

This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a9bb76fe9f [Go SDK]: Implement standalone single-precision float encoder (#22664)
7a9bb76fe9f is described below

commit 7a9bb76fe9f4c167c1d125db9d2cff9a1a315149
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Fri Aug 12 11:18:12 2022 -0400

    [Go SDK]: Implement standalone single-precision float encoder (#22664)
    
    * Implement standalone single-precision float encoder
    
    * Add fuzz test
    
    * Fix buffer size
    
    * Fix docstring
    
    * Add CHANGES.md entry
---
 CHANGES.md                                         |  1 +
 .../pkg/beam/core/graph/coder/coder_fuzz_test.go   | 19 ++++++++
 sdks/go/pkg/beam/core/graph/coder/float.go         | 41 ++++++++++++++++++
 sdks/go/pkg/beam/core/graph/coder/float_test.go    | 50 ++++++++++++++++++++++
 sdks/go/pkg/beam/core/graph/coder/row_decoder.go   | 13 +++++-
 sdks/go/pkg/beam/core/graph/coder/row_encoder.go   |  6 ++-
 6 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 82519483b7f..b8775c11742 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
 ## Breaking Changes
 
 * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* The Go SDK's Row Coder now uses a different single-precision float encoding for float32 types to match Java's behavior ([#22629](https://github.com/apache/beam/issues/22629)).
 
 ## Deprecations
 
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
index f0436cd5506..f18fa43f5a4 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go
@@ -65,6 +65,25 @@ func FuzzEncodeDecodeDouble(f *testing.F) {
 	})
 }
 
+func FuzzEncodeDecodeSinglePrecisionFloat(f *testing.F) {
+	f.Add(float32(3.141))
+	f.Fuzz(func(t *testing.T, a float32) {
+		var buf bytes.Buffer
+		err := EncodeSinglePrecisionFloat(a, &buf)
+		if err != nil {
+			return
+		}
+
+		actual, err := DecodeSinglePrecisionFloat(&buf)
+		if err != nil {
+			t.Fatalf("DecodeDouble(%v) failed: %v", &buf, err)
+		}
+		if math.Abs(float64(actual-a)) > floatPrecision {
+			t.Fatalf("got %f, want %f +/- %f", actual, a, floatPrecision)
+		}
+	})
+}
+
 func FuzzEncodeDecodeUInt64(f *testing.F) {
 	f.Add(uint64(42))
 	f.Fuzz(func(t *testing.T, b uint64) {
diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go
new file mode 100644
index 00000000000..4a311f5cf72
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/float.go
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"encoding/binary"
+	"io"
+	"math"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodeSinglePrecisionFloat encodes a float32 in big endian format.
+func EncodeSinglePrecisionFloat(value float32, w io.Writer) error {
+	var data [4]byte
+	binary.BigEndian.PutUint32(data[:], math.Float32bits(value))
+	_, err := ioutilx.WriteUnsafe(w, data[:])
+	return err
+}
+
+// DecodeSinglePrecisionFloat decodes a float32 in big endian format.
+func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) {
+	var data [4]byte
+	if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil {
+		return 0, err
+	}
+	return math.Float32frombits(binary.BigEndian.Uint32(data[:])), nil
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/float_test.go b/sdks/go/pkg/beam/core/graph/coder/float_test.go
new file mode 100644
index 00000000000..645ea51e67d
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/float_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"bytes"
+	"math"
+	"testing"
+)
+
+func TestEncodeDecodeSinglePrecisionFloat(t *testing.T) {
+	var tests []float32
+	for x := float32(-100.0); x <= float32(100.0); x++ {
+		tests = append(tests, 0.1*x)
+	}
+	tests = append(tests, -math.MaxFloat32)
+	tests = append(tests, math.MaxFloat32)
+	for _, test := range tests {
+		var buf bytes.Buffer
+		if err := EncodeSinglePrecisionFloat(test, &buf); err != nil {
+			t.Fatalf("EncodeSinglePrecisionFloat(%v) failed: %v", test, err)
+		}
+		t.Logf("Encoded %v to %v", test, buf.Bytes())
+
+		if len(buf.Bytes()) != 4 {
+			t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 4", test, len(buf.Bytes()))
+		}
+
+		actual, err := DecodeSinglePrecisionFloat(&buf)
+		if err != nil {
+			t.Fatalf("DecodeSinglePrecisionFloat(<%v>) failed: %v", test, err)
+		}
+		if actual != test {
+			t.Errorf("DecodeSinglePrecisionFloat(<%v>) = %v, want %v", test, actual, test)
+		}
+	}
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
index 9688ed9876c..689f687af14 100644
--- a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go
@@ -241,6 +241,15 @@ func reflectDecodeUint(rv reflect.Value, r io.Reader) error {
 	return nil
 }
 
+func reflectDecodeSinglePrecisionFloat(rv reflect.Value, r io.Reader) error {
+	v, err := DecodeSinglePrecisionFloat(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding single-precision float field")
+	}
+	rv.SetFloat(float64(v))
+	return nil
+}
+
 func reflectDecodeFloat(rv reflect.Value, r io.Reader) error {
 	v, err := DecodeDouble(r)
 	if err != nil {
@@ -336,7 +345,9 @@ func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type) (typeDec
 		return typeDecoderFieldReflect{decode: reflectDecodeInt}, nil
 	case reflect.Uint, reflect.Uint64, reflect.Uint32, reflect.Uint16:
 		return typeDecoderFieldReflect{decode: reflectDecodeUint}, nil
-	case reflect.Float32, reflect.Float64:
+	case reflect.Float32:
+		return typeDecoderFieldReflect{decode: reflectDecodeSinglePrecisionFloat}, nil
+	case reflect.Float64:
 		return typeDecoderFieldReflect{decode: reflectDecodeFloat}, nil
 	case reflect.Ptr:
 		decf, err := b.decoderForSingleTypeReflect(t.Elem())
diff --git a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
index cfc1a8e51a3..dc41890b004 100644
--- a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go
@@ -208,7 +208,11 @@ func (b *RowEncoderBuilder) encoderForSingleTypeReflect(t reflect.Type) (typeEnc
 		return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
 			return EncodeVarUint64(rv.Uint(), w)
 		}}, nil
-	case reflect.Float32, reflect.Float64:
+	case reflect.Float32:
+		return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
+			return EncodeSinglePrecisionFloat(float32(rv.Float()), w)
+		}}, nil
+	case reflect.Float64:
 		return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error {
 			return EncodeDouble(rv.Float(), w)
 		}}, nil