You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/08/15 00:02:10 UTC

[beam] 01/01: Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch revert-12554-beam9615schema2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8069793f553fc31c6d4544f4045df8ac619ceb54
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Aug 14 17:01:34 2020 -0700

    Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"
    
    This reverts commit bc0ba7e3e7f907db74de807844d71e817d6a5359.
---
 sdks/go/gogradle.lock                              |  37 +--
 .../pkg/beam/core/runtime/graphx/schema/schema.go  | 250 ++++-----------------
 .../beam/core/runtime/graphx/schema/schema_test.go | 207 +----------------
 3 files changed, 53 insertions(+), 441 deletions(-)

diff --git a/sdks/go/gogradle.lock b/sdks/go/gogradle.lock
index 6754757..c24b7e2 100644
--- a/sdks/go/gogradle.lock
+++ b/sdks/go/gogradle.lock
@@ -52,6 +52,13 @@ dependencies:
       vcs: "git"
     vendorPath: "vendor/github.com/coreos/bbolt"
     transitive: false
+  - urls:
+    - "https://github.com/etcd-io/etcd.git"
+    - "git@github.com:etcd-io/etcd.git"
+    vcs: "git"
+    name: "github.com/etcd-io/etcd"
+    commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921"
+    transitive: false
   - name: "github.com/coreos/go-semver"
     host:
       name: "github.com/etcd-io/etcd"
@@ -138,13 +145,6 @@ dependencies:
     commit: "44cc805cf13205b55f69e14bcb69867d1ae92f98"
     transitive: false
   - urls:
-    - "https://github.com/etcd-io/etcd.git"
-    - "git@github.com:etcd-io/etcd.git"
-    vcs: "git"
-    name: "github.com/etcd-io/etcd"
-    commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921"
-    transitive: false
-  - urls:
     - "https://github.com/fsnotify/fsnotify.git"
     - "git@github.com:fsnotify/fsnotify.git"
     vcs: "git"
@@ -200,7 +200,7 @@ dependencies:
     - "git@github.com:golang/protobuf.git"
     vcs: "git"
     name: "github.com/golang/protobuf"
-    commit: "d04d7b157bb510b1e0c10132224b616ac0e26b17"
+    commit: "ed6926b37a637426117ccab59282c3839528a700"
     transitive: false
   - urls:
     - "https://github.com/golang/snappy.git"
@@ -224,7 +224,7 @@ dependencies:
     - "git@github.com:google/go-cmp.git"
     vcs: "git"
     name: "github.com/google/go-cmp"
-    commit: "9680bfaf28748393e28e00238d94070fb9972fd8"
+    commit: "3af367b6b30c263d47e8895973edcca9a49cf029"
     transitive: false
   - urls:
     - "https://github.com/google/pprof.git"
@@ -701,21 +701,4 @@ dependencies:
       vcs: "git"
     vendorPath: "vendor/gopkg.in/yaml.v2"
     transitive: false
-  test:
-  - urls:
-    - "https://github.com/google/go-cmp.git"
-    - "git@github.com:google/go-cmp.git"
-    vcs: "git"
-    name: "github.com/google/go-cmp"
-    commit: "9680bfaf28748393e28e00238d94070fb9972fd8"
-    transitive: false
-  - vcs: "git"
-    name: "golang.org/x/xerrors"
-    commit: "5ec99f83aff198f5fbd629d6c8d8eb38a04218ca"
-    url: "https://go.googlesource.com/xerrors"
-    transitive: false
-  - vcs: "git"
-    name: "google.golang.org/protobuf"
-    commit: "d165be301fb1e13390ad453281ded24385fd8ebc"
-    url: "https://go.googlesource.com/protobuf"
-    transitive: false
+  test: []
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 40ed972a..ac724b3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -28,169 +28,28 @@ package schema
 import (
 	"fmt"
 	"reflect"
-	"strconv"
 	"strings"
-	"sync/atomic"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
-var lastShortID int64
-
-// TODO(BEAM-9615): Replace with UUIDs.
-func getNextID() string {
-	id := atomic.AddInt64(&lastShortID, 1)
-	// No reason not to use the smallest string short ids possible.
-	return strconv.FormatInt(id, 36)
-}
-
-var (
-	// Maps types to schemas for reuse, caching the UUIDs.
-	typeToSchema = map[reflect.Type]*pipepb.Schema{}
-	// Maps synthetic types to user types. Keys must be generated from a schema.
-	// This works around using the generated type assertion shims failing to type assert.
-	// Type assertion isn't assignability, which is closer to how the reflection based
-	// shims operate.
-	// User types are mapped to themselves to also signify they've been registered.
-	syntheticToUser = map[reflect.Type]reflect.Type{}
-)
-
-// Registered returns whether the given type has been registered with
-// the schema package.
-func Registered(ut reflect.Type) bool {
-	_, ok := syntheticToUser[ut]
-	return ok
-}
-
-// RegisterType converts the type to it's schema representation, and converts it back to
-// a synthetic type so we can map from the synthetic type back to the user type.
-// Recursively registers other named struct types in any component parts.
-func RegisterType(ut reflect.Type) {
-	registerType(ut, map[reflect.Type]struct{}{})
-}
-
-func registerType(ut reflect.Type, seen map[reflect.Type]struct{}) {
-	if _, ok := syntheticToUser[ut]; ok {
-		return
-	}
-	if _, ok := seen[ut]; ok {
-		return // already processed in this pass, don't reprocess.
-	}
-	seen[ut] = struct{}{}
-
-	// Lets do some recursion to register fundamental type parts.
-	t := ut
-	switch t.Kind() {
-	case reflect.Map:
-		registerType(t.Key(), seen)
-		fallthrough
-	case reflect.Array, reflect.Slice, reflect.Ptr:
-		registerType(t.Elem(), seen)
-		return
-	case reflect.Struct: // What we expect here.
-	default:
-		return
-	}
-	runtime.RegisterType(ut)
-
-	for i := 0; i < t.NumField(); i++ {
-		sf := ut.Field(i)
-		registerType(sf.Type, seen)
-	}
-
-	schm, err := FromType(ut)
-	if err != nil {
-		panic(errors.WithContextf(err, "converting %v to schema", ut))
-	}
-	synth, err := ToType(schm)
-	if err != nil {
-		panic(errors.WithContextf(err, "converting %v's back to a synthetic type", ut))
-	}
-	synth = reflectx.SkipPtr(synth)
-	ut = reflectx.SkipPtr(ut)
-	syntheticToUser[synth] = ut
-	syntheticToUser[reflect.PtrTo(synth)] = reflect.PtrTo(ut)
-	syntheticToUser[ut] = ut
-	syntheticToUser[reflect.PtrTo(ut)] = reflect.PtrTo(ut)
-}
-
 // FromType returns a Beam Schema of the passed in type.
 // Returns an error if the type cannot be converted to a Schema.
 func FromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
-	}
-	schm, err := structToSchema(ot)
-	if err != nil {
-		return nil, err
-	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
-	}
-	return schm, nil
-}
-
-// Schema Option urns.
-const (
-	// optGoNillable indicates that this top level schema should be returned as a pointer type.
-	optGoNillable = "beam:schema:go:nillable:v1"
-	// optGoInt indicates that this field should be decoded to an int, rather than an int64.
-	optGoInt = "beam:schema:go:int:v1"
-	// Since maps, arrays, and iterables don't have options, we need additional options
-	// to handle plain go integers.
-	optGoIntKey  = "beam:schema:go:intkey:v1"  // For int map keys
-	optGoIntElem = "beam:schema:go:intelem:v1" // For int values for maps,slices, and arrays
-)
-
-func goIntOptions(t reflect.Type) []*pipepb.Option {
-	var opts []*pipepb.Option
-	switch t.Kind() {
-	case reflect.Int:
-		opts = append(opts, &pipepb.Option{
-			Name: optGoInt,
-		})
-	case reflect.Map:
-		if t.Key().Kind() == reflect.Int {
-			opts = append(opts, &pipepb.Option{
-				Name: optGoIntKey,
-			})
-		}
-		fallthrough
-	case reflect.Array, reflect.Slice:
-		if t.Elem().Kind() == reflect.Int {
-			opts = append(opts, &pipepb.Option{
-				Name: optGoIntElem,
-			})
-		}
+	t := ot // keep the original type for errors.
+	// The top level schema for a pointer to struct and the struct is the same.
+	if t.Kind() == reflect.Ptr {
+		t = t.Elem()
 	}
-	return opts
-}
-
-// nillableFromOptions converts the passed in type to it's pointer version
-// if the option is present. This permits go types to be pointers.
-func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
-	return checkOptions(opts, optGoNillable, reflect.PtrTo(t))
-}
-
-func checkOptions(opts []*pipepb.Option, urn string, rt reflect.Type) reflect.Type {
-	for _, opt := range opts {
-		if opt.GetName() == urn {
-			return rt
-		}
+	if t.Kind() != reflect.Struct {
+		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
 	}
-	return nil
+	return structToSchema(t)
 }
 
-func structToSchema(ot reflect.Type) (*pipepb.Schema, error) {
-	if schm, ok := typeToSchema[ot]; ok {
-		return schm, nil
-	}
-	t := reflectx.SkipPtr(ot)
+func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
 	fields := make([]*pipepb.Field, 0, t.NumField())
 	for i := 0; i < t.NumField(); i++ {
 		f, err := structFieldToField(t.Field(i))
@@ -199,13 +58,9 @@ func structToSchema(ot reflect.Type) (*pipepb.Schema, error) {
 		}
 		fields = append(fields, f)
 	}
-
-	schm := &pipepb.Schema{
+	return &pipepb.Schema{
 		Fields: fields,
-		Id:     getNextID(),
-	}
-	typeToSchema[ot] = schm
-	return schm, nil
+	}, nil
 }
 
 func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
@@ -213,18 +68,17 @@ func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
 	if tag := sf.Tag.Get("beam"); tag != "" {
 		name, _ = parseTag(tag)
 	}
-	ftype, opts, err := reflectTypeToFieldType(sf.Type)
+	ftype, err := reflectTypeToFieldType(sf.Type)
 	if err != nil {
 		return nil, err
 	}
 	return &pipepb.Field{
-		Name:    name,
-		Type:    ftype,
-		Options: opts,
+		Name: name,
+		Type: ftype,
 	}, nil
 }
 
-func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Option, error) {
+func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
 	var isPtr bool
 	t := ot
 	if t.Kind() == reflect.Ptr {
@@ -233,13 +87,13 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 	}
 	switch t.Kind() {
 	case reflect.Map:
-		kt, _, err := reflectTypeToFieldType(t.Key())
+		kt, err := reflectTypeToFieldType(t.Key())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
 		}
-		vt, _, err := reflectTypeToFieldType(t.Elem())
+		vt, err := reflectTypeToFieldType(t.Elem())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
 		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
@@ -249,11 +103,11 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					ValueType: vt,
 				},
 			},
-		}, goIntOptions(t), nil
+		}, nil
 	case reflect.Struct:
 		sch, err := structToSchema(t)
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
 		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
@@ -262,7 +116,7 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					Schema: sch,
 				},
 			},
-		}, nil, nil
+		}, nil
 	case reflect.Slice, reflect.Array:
 		// Special handling for []byte
 		if t == reflectx.ByteSlice {
@@ -271,13 +125,12 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 				TypeInfo: &pipepb.FieldType_AtomicType{
 					AtomicType: pipepb.AtomicType_BYTES,
 				},
-			}, nil, nil
+			}, nil
 		}
-		vt, _, err := reflectTypeToFieldType(t.Elem())
+		vt, err := reflectTypeToFieldType(t.Elem())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
 		}
-		opts := goIntOptions(t)
 		return &pipepb.FieldType{
 			Nullable: isPtr,
 			TypeInfo: &pipepb.FieldType_ArrayType{
@@ -285,9 +138,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					ElementType: vt,
 				},
 			},
-		}, opts, nil
+		}, nil
 	case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
-		return nil, nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
+		return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
 	default: // must be an atomic type
 		if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
 			return &pipepb.FieldType{
@@ -295,9 +148,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 				TypeInfo: &pipepb.FieldType_AtomicType{
 					AtomicType: enum,
 				},
-			}, goIntOptions(t), nil
+			}, nil
 		}
-		return nil, nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
+		return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
 	}
 }
 
@@ -325,32 +178,20 @@ func ToType(s *pipepb.Schema) (reflect.Type, error) {
 		}
 		fields = append(fields, rf)
 	}
-	ret := reflect.StructOf(fields)
-	if ut, ok := syntheticToUser[ret]; ok {
-		ret = ut
-	}
-	if t := nillableFromOptions(s.GetOptions(), ret); t != nil {
-		return t, nil
-	}
-	return ret, nil
+	return reflect.StructOf(fields), nil
 }
 
 func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
 	name := sf.GetName()
-	rt, err := fieldTypeToReflectType(sf.GetType(), sf.Options)
+	rt, err := fieldTypeToReflectType(sf.GetType())
 	if err != nil {
 		return reflect.StructField{}, err
 	}
-
-	rsf := reflect.StructField{
+	return reflect.StructField{
 		Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
 		Type: rt,
-	}
-	// Add a name tag if they don't match.
-	if name != rsf.Name {
-		rsf.Tag = reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name))
-	}
-	return rsf, nil
+		Tag:  reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
+	}, nil
 }
 
 var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
@@ -365,7 +206,7 @@ var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
 	pipepb.AtomicType_BYTES:   reflectx.ByteSlice,
 }
 
-func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (reflect.Type, error) {
+func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
 	var t reflect.Type
 	switch sft.GetTypeInfo().(type) {
 	case *pipepb.FieldType_AtomicType:
@@ -373,35 +214,21 @@ func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (refle
 		if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
 			return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
 		}
-		// Handle duplicate type matchings.
-		if optT := checkOptions(opts, optGoInt, reflectx.Int); optT != nil {
-			t = optT
-		}
 	case *pipepb.FieldType_ArrayType:
-		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType(), nil)
+		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert array element type")
 		}
-		// Handle duplicate type matchings.
-		if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil {
-			rt = optT
-		}
 		t = reflect.SliceOf(rt)
 	case *pipepb.FieldType_MapType:
-		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType(), nil)
+		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert map key type")
 		}
-		if optT := checkOptions(opts, optGoIntKey, reflectx.Int); optT != nil {
-			kt = optT
-		}
-		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType(), nil)
+		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert map value type")
 		}
-		if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil {
-			vt = optT
-		}
 		t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
 	case *pipepb.FieldType_RowType:
 		rt, err := ToType(sft.GetRowType().GetSchema())
@@ -412,9 +239,8 @@ func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (refle
 	// case *pipepb.FieldType_IterableType:
 	// TODO(BEAM-9615): handle IterableTypes.
 
-	//case *pipepb.FieldType_LogicalType:
+	// case *pipepb.FieldType_LogicalType:
 	// TODO(BEAM-9615): handle LogicalTypes types.
-	//sft.GetLogicalType().
 
 	// Logical Types are for things that have more specialized user representation already, or
 	// things like Time or protocol buffers.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index 741b79f..f6b5ff7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -20,32 +20,10 @@ import (
 	"reflect"
 	"testing"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-	"github.com/golang/protobuf/proto"
 	"github.com/google/go-cmp/cmp"
-	"google.golang.org/protobuf/testing/protocmp"
 )
 
-type registeredType struct {
-	A, B string
-	C    bool
-}
-
-type sRegisteredType struct {
-	D int32
-}
-
-type justAType struct {
-	A, B string
-	C    int
-}
-
-func init() {
-	runtime.RegisterType(reflect.TypeOf((*registeredType)(nil)))
-	RegisterType(reflect.TypeOf((*sRegisteredType)(nil)))
-}
-
 func TestSchemaConversion(t *testing.T) {
 	tests := []struct {
 		st *pipepb.Schema
@@ -172,177 +150,6 @@ func TestSchemaConversion(t *testing.T) {
 			rt: reflect.TypeOf(struct {
 				Payloads [][]byte `beam:"payloads"`
 			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "AString",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "AnIntPtr",
-						Type: &pipepb.FieldType{
-							Nullable: true,
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT32,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(struct {
-				AString  string
-				AnIntPtr *int32
-			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "A",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "B",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "C",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_BOOLEAN,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(registeredType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "D",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT32,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(sRegisteredType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "A",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "B",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "C",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT64,
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoInt,
-						}},
-					},
-				},
-			},
-			rt: reflect.TypeOf(justAType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					{
-						Name: "Q",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_MapType{
-								MapType: &pipepb.MapType{
-									KeyType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-									ValueType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-								},
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoIntKey,
-						}, {
-							Name: optGoIntElem,
-						}},
-					}, {
-						Name: "T",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_ArrayType{
-								ArrayType: &pipepb.ArrayType{
-									ElementType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-								},
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoIntElem,
-						}},
-					},
-				},
-			},
-			rt: reflect.TypeOf(struct {
-				Q map[int]int
-				T []int
-			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					{
-						Name: "SuperNES",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT16,
-							},
-						},
-					},
-				},
-				Options: []*pipepb.Option{{
-					Name: optGoNillable,
-				}},
-			},
-			rt: reflect.TypeOf(&struct {
-				SuperNES int16
-			}{}),
 		},
 	}
 
@@ -354,11 +161,9 @@ func TestSchemaConversion(t *testing.T) {
 				if err != nil {
 					t.Fatalf("error ToType(%v) = %v", test.st, err)
 				}
-				if !test.rt.AssignableTo(got) {
-					t.Errorf("%v not assignable to %v", test.rt, got)
-					if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" {
-						t.Errorf("diff (-want, +got): %v", d)
-					}
+
+				if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
 				}
 			}
 			{
@@ -366,10 +171,8 @@ func TestSchemaConversion(t *testing.T) {
 				if err != nil {
 					t.Fatalf("error FromType(%v) = %v", test.rt, err)
 				}
-				if d := cmp.Diff(test.st, got,
-					protocmp.Transform(),
-					protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"),
-				); d != "" {
+
+				if d := cmp.Diff(test.st, got); d != "" {
 					t.Errorf("diff (-want, +got): %v", d)
 				}