You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/24 02:27:01 UTC

[GitHub] [beam] youngoli commented on a change in pull request #13611: [BEAM-9615] Custom Schema Coder Support

youngoli commented on a change in pull request #13611:
URL: https://github.com/apache/beam/pull/13611#discussion_r548338119



##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// RowDecoderBuilder allows one to build Beam Schema row encoders for provided types.
+type RowDecoderBuilder struct {
+	allFuncs   map[reflect.Type]decoderProvider
+	ifaceFuncs []reflect.Type
+}
+
+type decoderProvider = func(reflect.Type) (func(io.Reader) (interface{}, error), error)

Review comment:
       What's the purpose for `allFuncs` to contain `decoderProvider` instead of `func(io.Reader) (interface{}, error)` directly? Is it to allow for interface types? Like the `decoderProvider` for an interface type would return different decoders by being passed a concrete type?

##########
File path: sdks/go/pkg/beam/core/graph/coder/row_encoder.go
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type encoderProvider = func(reflect.Type) (func(interface{}, io.Writer) error, error)
+
+// RowEncoderBuilder allows one to build Beam Schema row encoders for provided types.
+type RowEncoderBuilder struct {
+	allFuncs   map[reflect.Type]encoderProvider
+	ifaceFuncs []reflect.Type
+}
+
+// Register accepts a provider for the given type to schema encode values of that type.
+//
+// When generating encoding functions, this builder will first check for exact type
+// matches, then against interfaces with registered factories in recency order of
+// registration, and then finally use the default Beam Schema encoding behavior.
+//
+// TODO(BEAM-9615): Add final factory types. This interface is subject to change.
+// Currently f must be a function of the type func(reflect.Type) func(T, io.Writer) (error).
+func (b *RowEncoderBuilder) Register(rt reflect.Type, f interface{}) {
+	fe, ok := f.(encoderProvider)
+	if !ok {
+		panic(fmt.Sprintf("%v isn't a supported decoder function type (passed with %T)", f, rt))

Review comment:
       ```suggestion
   		panic(fmt.Sprintf("%v isn't a supported encoder function type (passed with %T)", f, rt))
   ```

##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil_test.go
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+type UserInterface interface {
+	mark()
+}
+
+type UserType1 struct {
+	A string
+	B int
+	C string
+}
+
+func (UserType1) mark() {}
+
+func ut1EncDropB(val interface{}, w io.Writer) error {
+	if err := coder.WriteSimpleRowHeader(2, w); err != nil {
+		return err
+	}
+	elm := val.(UserType1)
+	if err := coder.EncodeStringUTF8(elm.A, w); err != nil {
+		return err
+	}
+	if err := coder.EncodeStringUTF8(elm.C, w); err != nil {
+		return err
+	}
+	return nil
+}
+
+func ut1DecDropB(r io.Reader) (interface{}, error) {
+	if err := coder.ReadSimpleRowHeader(2, r); err != nil {
+		return nil, err
+	}
+	a, err := coder.DecodeStringUTF8(r)
+	if err != nil {
+		return nil, fmt.Errorf("decoding string field A: %w", err)
+	}
+	c, err := coder.DecodeStringUTF8(r)
+	if err != nil {
+		return nil, fmt.Errorf("decoding string field C: %v, %w", c, err)
+	}
+	return UserType1{
+		A: a,
+		B: 42,
+		C: c,
+	}, nil
+}
+
+// TestValidateCoder_SingleValue checks that the validate coder fun will
+func TestValidateCoder(t *testing.T) {
+	// Validates a custom UserType1 encoding, which drops encoding the "B" field,
+	// always setting it to a constant value.
+	t.Run("SingleValue", func(t *testing.T) {
+		(&SchemaCoder{}).Validate(t, reflect.TypeOf((*UserType1)(nil)).Elem(),
+			func(reflect.Type) (func(interface{}, io.Writer) error, error) { return ut1EncDropB, nil },
+			func(reflect.Type) (func(io.Reader) (interface{}, error), error) { return ut1DecDropB, nil },
+			struct{ A, C string }{},
+			UserType1{
+				A: "cats",
+				B: 42,
+				C: "pjamas",
+			},
+		)
+	})
+	t.Run("SliceOfValues", func(t *testing.T) {
+		(&SchemaCoder{}).Validate(t, reflect.TypeOf((*UserType1)(nil)).Elem(),
+			func(reflect.Type) (func(interface{}, io.Writer) error, error) { return ut1EncDropB, nil },
+			func(reflect.Type) (func(io.Reader) (interface{}, error), error) { return ut1DecDropB, nil },
+			struct{ A, C string }{},
+			[]UserType1{
+				{
+					A: "cats",
+					B: 42,
+					C: "pjamas",
+				}, {
+					A: "dogs",
+					B: 42,
+					C: "breakfast",
+				}, {
+					A: "fish",
+					B: 42,
+					C: "plenty of",
+				},
+			},
+		)
+	})
+	t.Run("InterfaceCoder", func(t *testing.T) {
+		(&SchemaCoder{}).Validate(t, reflect.TypeOf((*UserInterface)(nil)).Elem(),
+			func(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
+				return ut1EncDropB, nil
+			},
+			func(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+				return ut1DecDropB, nil
+			},
+			struct{ A, C string }{},
+			UserType1{
+				A: "cats",
+				B: 42,
+				C: "pjamas",
+			},
+		)
+	})
+}

Review comment:
       Suggestion: You could probably add a test for the failure case though you'll need to do some refactoring on the Validate function. Basically just have an inner function that's the current implementation but doesn't use `*testing.T` and instead returns an error if something went wrong and make the current exported `Validate` function call that and do `t.Fatalf` if an error appears. With that refactor, you can add a test that calls the inner function with an element that doesn't have 42 for `B`, so you can validate that it fails when expected.

##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil.go
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package testutil contains helpers to test and validate custom Beam Schema coders.
+package testutil
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/google/go-cmp/cmp"
+)
+
+// SchemaCoder helps validate custom schema coders.
+type SchemaCoder struct {
+	encBldUT, encBldSchema coder.RowEncoderBuilder
+	decBldUT, decBldSchema coder.RowDecoderBuilder
+
+	// CmpOptions to pass into the round trip comparison
+	CmpOptions cmp.Options
+}
+
+// Register adds additional custom types not under test to both the under test
+// and default schema coders.
+func (v *SchemaCoder) Register(rt reflect.Type, encF, decF interface{}) {
+	v.encBldUT.Register(rt, encF)
+	v.encBldSchema.Register(rt, encF)
+	v.decBldUT.Register(rt, decF)
+	v.decBldSchema.Register(rt, decF)
+}
+
+// Validate is a test utility to validate custom schema coders generate
+// beam schema encoded bytes.
+//
+// Validate accepts the reflect.Type to register, factory functions for encoding and decoding, an
+// anonymous struct type equivalent to the encoded format produced and consumed by the factory produced functions
+// and test values. Values must be a single struct or pointer to struct.

Review comment:
       This line reads like the parameter `values` shouldn't be a slice, but if I'm reading the code correctly it's supposed to mean that each element in `values` must be a single struct or pointer to struct. I recommend rephrasing it along these lines:
   
   > Values must be either a struct, pointer to struct, or a slice where each element is a struct or pointer to struct.

##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil_test.go
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+)
+
+type UserInterface interface {
+	mark()
+}
+
+type UserType1 struct {
+	A string
+	B int
+	C string
+}
+
+func (UserType1) mark() {}
+
+func ut1EncDropB(val interface{}, w io.Writer) error {
+	if err := coder.WriteSimpleRowHeader(2, w); err != nil {
+		return err
+	}
+	elm := val.(UserType1)
+	if err := coder.EncodeStringUTF8(elm.A, w); err != nil {
+		return err
+	}
+	if err := coder.EncodeStringUTF8(elm.C, w); err != nil {
+		return err
+	}
+	return nil
+}
+
+func ut1DecDropB(r io.Reader) (interface{}, error) {
+	if err := coder.ReadSimpleRowHeader(2, r); err != nil {
+		return nil, err
+	}
+	a, err := coder.DecodeStringUTF8(r)
+	if err != nil {
+		return nil, fmt.Errorf("decoding string field A: %w", err)
+	}
+	c, err := coder.DecodeStringUTF8(r)
+	if err != nil {
+		return nil, fmt.Errorf("decoding string field C: %v, %w", c, err)
+	}
+	return UserType1{
+		A: a,
+		B: 42,
+		C: c,
+	}, nil
+}
+
+// TestValidateCoder_SingleValue checks that the validate coder fun will

Review comment:
       Typo: Unfinished comment.

##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// RowDecoderBuilder allows one to build Beam Schema row encoders for provided types.
+type RowDecoderBuilder struct {
+	allFuncs   map[reflect.Type]decoderProvider
+	ifaceFuncs []reflect.Type
+}
+
+type decoderProvider = func(reflect.Type) (func(io.Reader) (interface{}, error), error)
+
+// Register accepts a provider to decode schema encoded values
+// of that type.
+//
+// When decoding values, decoder functions produced by this builder will
+// first check for exact type matches, then interfaces implemented by
+// the type in recency order of registration, and then finally the
+// default Beam Schema encoding behavior.
+//
+// TODO(BEAM-9615): Add final factory types. This interface is subject to change.
+// Currently f must be a function  func(reflect.Type) (func(io.Reader) (interface{}, error), error)
+func (b *RowDecoderBuilder) Register(rt reflect.Type, f interface{}) {
+	fd, ok := f.(decoderProvider)
+	if !ok {
+		panic(fmt.Sprintf("%v isn't a supported decoder function type (passed with %T)", f, rt))
+	}
+
+	if rt.Kind() == reflect.Interface && rt.NumMethod() == 0 {
+		panic(fmt.Sprintf("interface type %v must have methods", rt))
+	}
+
+	if b.allFuncs == nil {
+		b.allFuncs = make(map[reflect.Type]decoderProvider)
+	}
+	b.allFuncs[rt] = fd
+	if rt.Kind() == reflect.Interface {
+		b.ifaceFuncs = append(b.ifaceFuncs, rt)
+	}
+}
+
+// Build constructs a Beam Schema coder for the given type, using any providers registered for
+// itself or it's fields.
+func (b *RowDecoderBuilder) Build(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return b.decoderForType(rt), nil
+}
+
+// decoderForType returns a decoder function for the struct or pointer to struct type.
+func (b *RowDecoderBuilder) decoderForType(t reflect.Type) func(io.Reader) (interface{}, error) {
+	// Check if there are any providers registered for this type, or that this type adheres to any interfaces.
+	if f := b.customFunc(t); f != nil {
+		return f
+	}
+
+	var isPtr bool
+	// Pointers become the value type for decomposition.
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	dec := b.decoderForStructReflect(t)
+
+	if isPtr {
+		return func(r io.Reader) (interface{}, error) {
+			rv := reflect.New(t)
+			err := dec(rv.Elem(), r)
+			return rv.Interface(), err
+		}
+	}
+	return func(r io.Reader) (interface{}, error) {
+		rv := reflect.New(t)
+		err := dec(rv.Elem(), r)
+		return rv.Elem().Interface(), err
+	}
+}
+
+// decoderForStructReflect returns a reflection based decoder function for the
+// given struct type.
+func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	var coder typeDecoderReflect
+	for i := 0; i < t.NumField(); i++ {
+		i := i // avoid alias issues in the closures.
+		dec := b.decoderForSingleTypeReflect(t.Field(i).Type)
+		coder.fields = append(coder.fields, func(rv reflect.Value, r io.Reader) error {
+			return dec(rv.Field(i), r)
+		})
+	}
+	return func(rv reflect.Value, r io.Reader) error {
+		nf, nils, err := ReadRowHeader(r)
+		if err != nil {
+			return err
+		}
+		if nf != len(coder.fields) {
+			return errors.Errorf("schema[%v] changed: got %d fields, want %d fields", "TODO", nf, len(coder.fields))
+		}
+		for i, f := range coder.fields {
+			if IsFieldNil(nils, i) {
+				continue
+			}
+			if err := f(rv, r); err != nil {
+				return err
+			}
+		}
+		return nil
+	}
+}
+
+func reflectDecodeBool(rv reflect.Value, r io.Reader) error {
+	v, err := DecodeBool(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding bool field")
+	}
+	rv.SetBool(v)
+	return nil
+}
+
+func reflectDecodeByte(rv reflect.Value, r io.Reader) error {
+	b, err := DecodeByte(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding single byte field")
+	}
+	rv.SetUint(uint64(b))
+	return nil
+}
+
+func reflectDecodeString(rv reflect.Value, r io.Reader) error {
+	v, err := DecodeStringUTF8(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding string field")
+	}
+	rv.SetString(v)
+	return nil
+}
+
+func reflectDecodeInt(rv reflect.Value, r io.Reader) error {
+	v, err := DecodeVarInt(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding varint field")
+	}
+	rv.SetInt(v)
+	return nil
+}
+
+func reflectDecodeFloat(rv reflect.Value, r io.Reader) error {
+	v, err := DecodeDouble(r)
+	if err != nil {
+		return errors.Wrap(err, "error decoding double field")
+	}
+	rv.SetFloat(v)
+	return nil
+}
+
+// customFunc returns nil if no custom func exists for this.
+func (b *RowDecoderBuilder) customFunc(t reflect.Type) func(io.Reader) (interface{}, error) {
+	if fact, ok := b.allFuncs[t]; ok {
+		f, err := fact(t)
+
+		// TODO handle errors?
+		if err != nil {
+			return nil
+		}
+		return f
+	}
+	// Check satisfaction of interface types in reverse registration order.
+	for i := len(b.ifaceFuncs) - 1; i >= 0; i-- {
+		it := b.ifaceFuncs[i]
+		if ok := t.AssignableTo(it); ok {
+			if fact, ok := b.allFuncs[it]; ok {
+				f, err := fact(t)
+				// TODO handle errors?
+				if err != nil {
+					return nil
+				}
+				return f
+			}
+		}
+	}
+	return nil
+}
+
+// decoderForSingleTypeReflect returns a reflection based decoder function for the
+// given type.
+func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	// Check if there are any providers registered for this type, or that this type adheres to any interfaces.
+	if dec := b.customFunc(t); dec != nil {
+		return func(v reflect.Value, r io.Reader) error {
+			elm, err := dec(r)
+			if err != nil {
+				return err
+			}
+			v.Set(reflect.ValueOf(elm))
+			return nil
+		}
+	}
+
+	switch t.Kind() {
+	case reflect.Struct:
+		return b.decoderForStructReflect(t)
+	case reflect.Bool:
+		return reflectDecodeBool
+	case reflect.Uint8:
+		return reflectDecodeByte
+	case reflect.String:
+		return reflectDecodeString
+	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+		return reflectDecodeInt
+	case reflect.Float32, reflect.Float64:
+		return reflectDecodeFloat
+	case reflect.Ptr:
+		decf := b.decoderForSingleTypeReflect(t.Elem())
+		return func(rv reflect.Value, r io.Reader) error {
+			nv := reflect.New(t.Elem())
+			rv.Set(nv)
+			return decf(nv.Elem(), r)
+		}
+	case reflect.Slice:
+		// Special case handling for byte slices.
+		if t.Elem().Kind() == reflect.Uint8 {
+			return func(rv reflect.Value, r io.Reader) error {

Review comment:
       Nit: This seems like a good candidate for a `reflectDecodeBytes` helper function like the others above.

##########
File path: sdks/go/pkg/beam/core/graph/coder/testutil/testutil.go
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package testutil contains helpers to test and validate custom Beam Schema coders.
+package testutil
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/google/go-cmp/cmp"
+)
+
+// SchemaCoder helps validate custom schema coders.
+type SchemaCoder struct {
+	encBldUT, encBldSchema coder.RowEncoderBuilder
+	decBldUT, decBldSchema coder.RowDecoderBuilder
+
+	// CmpOptions to pass into the round trip comparison
+	CmpOptions cmp.Options
+}
+
+// Register adds additional custom types not under test to both the under test
+// and default schema coders.
+func (v *SchemaCoder) Register(rt reflect.Type, encF, decF interface{}) {
+	v.encBldUT.Register(rt, encF)
+	v.encBldSchema.Register(rt, encF)
+	v.decBldUT.Register(rt, decF)
+	v.decBldSchema.Register(rt, decF)
+}
+
+// Validate is a test utility to validate custom schema coders generate
+// beam schema encoded bytes.
+//
+// Validate accepts the reflect.Type to register, factory functions for encoding and decoding, an
+// anonymous struct type equivalent to the encoded format produced and consumed by the factory produced functions
+// and test values. Values must be a single struct or pointer to struct.
+//
+// TODO(lostluck): Improve documentation.
+// TODO(lostluck): Abstract into a configurable struct, to handle
+//
+// Validate will register the under test factories and generate an encoder and decoder function.
+// These functions will be re-used for all test values. This emulates coders being re-used for all
+// elements within a bundle.
+//
+// Validate mutates the SchemaCoderValidator, so the SchemaCoderValidator may not be used more than once.
+func (v *SchemaCoder) Validate(t *testing.T, rt reflect.Type, encF, decF, schema interface{}, values interface{}) {
+	t.Helper()
+	testValues := reflect.ValueOf(values)
+	// Check whether we have a slice type or not.
+	if testValues.Type().Kind() != reflect.Slice {
+		vs := reflect.MakeSlice(reflect.SliceOf(testValues.Type()), 0, 1)
+		testValues = reflect.Append(vs, testValues)
+	}
+	if testValues.Len() == 0 {
+		t.Fatalf("No test values provided for ValidateSchemaCoder(%v)", rt)
+	}
+	// We now have non empty slice of test values!
+
+	v.encBldUT.Register(rt, encF)
+	v.decBldUT.Register(rt, decF)
+
+	testRt := testValues.Type().Elem()
+	encUT, err := v.encBldUT.Build(testRt)
+	if err != nil {
+		t.Fatalf("Unable to build encoder function with given factory: coder.RowEncoderBuilder.Build(%v) = %v, want nil error", rt, err)
+	}
+	decUT, err := v.decBldUT.Build(testRt)
+	if err != nil {
+		t.Fatalf("Unable to build decoder function with given factory: coder.RowDecoderBuilder.Build(%v) = %v, want nil error", rt, err)
+	}
+
+	schemaRt := reflect.TypeOf(schema)
+	encSchema, err := v.encBldSchema.Build(schemaRt)
+	if err != nil {
+		t.Fatalf("Unable to build encoder function for schema equivalent type: coder.RowEncoderBuilder.Build(%v) = %v, want nil error", rt, err)
+	}
+	decSchema, err := v.decBldSchema.Build(schemaRt)
+	if err != nil {
+		t.Fatalf("Unable to build decoder function for schema equivalent type: coder.RowDecoderBuilder.Build(%v) = %v, want nil error", rt, err)
+	}
+	for i := 0; i < testValues.Len(); i++ {
+		t.Run(fmt.Sprintf("%v[%d]", rt, i), func(t *testing.T) {
+			var buf bytes.Buffer
+			want := testValues.Index(i).Interface()
+			if err := encUT(want, &buf); err != nil {
+				t.Fatalf("error calling Under Test encoder[%v](%v) = %v", testRt, want, err)
+			}
+			initialBytes := clone(buf.Bytes())
+
+			bufSchema := bytes.NewBuffer(clone(initialBytes))
+
+			schemaV, err := decSchema(bufSchema)
+			if err != nil {
+				t.Fatalf("error calling Equivalent Schema decoder[%v]() = %v", schemaRt, err)
+			}
+			err = encSchema(schemaV, bufSchema)

Review comment:
       Does reading from the buffer (a few lines above with `decSchema`) remove those bytes from the buffer (or otherwise prevent those bytes from being cloned)? Because on first read it seems like `roundTripBytes` below will contain the element's bytes twice, the first copied from `initialBytes`, and the second from `encSchema` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org