You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/08/15 00:02:09 UTC

[beam] branch revert-12554-beam9615schema2 created (now 8069793)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch revert-12554-beam9615schema2
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 8069793  Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"

This branch includes the following new commits:

     new 8069793  Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch revert-12554-beam9615schema2
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8069793f553fc31c6d4544f4045df8ac619ceb54
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Aug 14 17:01:34 2020 -0700

    Revert "[BEAM-9615] Map user types to Schema reps. (#12554)"
    
    This reverts commit bc0ba7e3e7f907db74de807844d71e817d6a5359.
---
 sdks/go/gogradle.lock                              |  37 +--
 .../pkg/beam/core/runtime/graphx/schema/schema.go  | 250 ++++-----------------
 .../beam/core/runtime/graphx/schema/schema_test.go | 207 +----------------
 3 files changed, 53 insertions(+), 441 deletions(-)

diff --git a/sdks/go/gogradle.lock b/sdks/go/gogradle.lock
index 6754757..c24b7e2 100644
--- a/sdks/go/gogradle.lock
+++ b/sdks/go/gogradle.lock
@@ -52,6 +52,13 @@ dependencies:
       vcs: "git"
     vendorPath: "vendor/github.com/coreos/bbolt"
     transitive: false
+  - urls:
+    - "https://github.com/etcd-io/etcd.git"
+    - "git@github.com:etcd-io/etcd.git"
+    vcs: "git"
+    name: "github.com/etcd-io/etcd"
+    commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921"
+    transitive: false
   - name: "github.com/coreos/go-semver"
     host:
       name: "github.com/etcd-io/etcd"
@@ -138,13 +145,6 @@ dependencies:
     commit: "44cc805cf13205b55f69e14bcb69867d1ae92f98"
     transitive: false
   - urls:
-    - "https://github.com/etcd-io/etcd.git"
-    - "git@github.com:etcd-io/etcd.git"
-    vcs: "git"
-    name: "github.com/etcd-io/etcd"
-    commit: "11214aa33bf5a47d3d9d8dafe0f6b97237dfe921"
-    transitive: false
-  - urls:
     - "https://github.com/fsnotify/fsnotify.git"
     - "git@github.com:fsnotify/fsnotify.git"
     vcs: "git"
@@ -200,7 +200,7 @@ dependencies:
     - "git@github.com:golang/protobuf.git"
     vcs: "git"
     name: "github.com/golang/protobuf"
-    commit: "d04d7b157bb510b1e0c10132224b616ac0e26b17"
+    commit: "ed6926b37a637426117ccab59282c3839528a700"
     transitive: false
   - urls:
     - "https://github.com/golang/snappy.git"
@@ -224,7 +224,7 @@ dependencies:
     - "git@github.com:google/go-cmp.git"
     vcs: "git"
     name: "github.com/google/go-cmp"
-    commit: "9680bfaf28748393e28e00238d94070fb9972fd8"
+    commit: "3af367b6b30c263d47e8895973edcca9a49cf029"
     transitive: false
   - urls:
     - "https://github.com/google/pprof.git"
@@ -701,21 +701,4 @@ dependencies:
       vcs: "git"
     vendorPath: "vendor/gopkg.in/yaml.v2"
     transitive: false
-  test:
-  - urls:
-    - "https://github.com/google/go-cmp.git"
-    - "git@github.com:google/go-cmp.git"
-    vcs: "git"
-    name: "github.com/google/go-cmp"
-    commit: "9680bfaf28748393e28e00238d94070fb9972fd8"
-    transitive: false
-  - vcs: "git"
-    name: "golang.org/x/xerrors"
-    commit: "5ec99f83aff198f5fbd629d6c8d8eb38a04218ca"
-    url: "https://go.googlesource.com/xerrors"
-    transitive: false
-  - vcs: "git"
-    name: "google.golang.org/protobuf"
-    commit: "d165be301fb1e13390ad453281ded24385fd8ebc"
-    url: "https://go.googlesource.com/protobuf"
-    transitive: false
+  test: []
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 40ed972a..ac724b3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -28,169 +28,28 @@ package schema
 import (
 	"fmt"
 	"reflect"
-	"strconv"
 	"strings"
-	"sync/atomic"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
-var lastShortID int64
-
-// TODO(BEAM-9615): Replace with UUIDs.
-func getNextID() string {
-	id := atomic.AddInt64(&lastShortID, 1)
-	// No reason not to use the smallest string short ids possible.
-	return strconv.FormatInt(id, 36)
-}
-
-var (
-	// Maps types to schemas for reuse, caching the UUIDs.
-	typeToSchema = map[reflect.Type]*pipepb.Schema{}
-	// Maps synthetic types to user types. Keys must be generated from a schema.
-	// This works around using the generated type assertion shims failing to type assert.
-	// Type assertion isn't assignability, which is closer to how the reflection based
-	// shims operate.
-	// User types are mapped to themselves to also signify they've been registered.
-	syntheticToUser = map[reflect.Type]reflect.Type{}
-)
-
-// Registered returns whether the given type has been registered with
-// the schema package.
-func Registered(ut reflect.Type) bool {
-	_, ok := syntheticToUser[ut]
-	return ok
-}
-
-// RegisterType converts the type to it's schema representation, and converts it back to
-// a synthetic type so we can map from the synthetic type back to the user type.
-// Recursively registers other named struct types in any component parts.
-func RegisterType(ut reflect.Type) {
-	registerType(ut, map[reflect.Type]struct{}{})
-}
-
-func registerType(ut reflect.Type, seen map[reflect.Type]struct{}) {
-	if _, ok := syntheticToUser[ut]; ok {
-		return
-	}
-	if _, ok := seen[ut]; ok {
-		return // already processed in this pass, don't reprocess.
-	}
-	seen[ut] = struct{}{}
-
-	// Lets do some recursion to register fundamental type parts.
-	t := ut
-	switch t.Kind() {
-	case reflect.Map:
-		registerType(t.Key(), seen)
-		fallthrough
-	case reflect.Array, reflect.Slice, reflect.Ptr:
-		registerType(t.Elem(), seen)
-		return
-	case reflect.Struct: // What we expect here.
-	default:
-		return
-	}
-	runtime.RegisterType(ut)
-
-	for i := 0; i < t.NumField(); i++ {
-		sf := ut.Field(i)
-		registerType(sf.Type, seen)
-	}
-
-	schm, err := FromType(ut)
-	if err != nil {
-		panic(errors.WithContextf(err, "converting %v to schema", ut))
-	}
-	synth, err := ToType(schm)
-	if err != nil {
-		panic(errors.WithContextf(err, "converting %v's back to a synthetic type", ut))
-	}
-	synth = reflectx.SkipPtr(synth)
-	ut = reflectx.SkipPtr(ut)
-	syntheticToUser[synth] = ut
-	syntheticToUser[reflect.PtrTo(synth)] = reflect.PtrTo(ut)
-	syntheticToUser[ut] = ut
-	syntheticToUser[reflect.PtrTo(ut)] = reflect.PtrTo(ut)
-}
-
 // FromType returns a Beam Schema of the passed in type.
 // Returns an error if the type cannot be converted to a Schema.
 func FromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
-	}
-	schm, err := structToSchema(ot)
-	if err != nil {
-		return nil, err
-	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
-	}
-	return schm, nil
-}
-
-// Schema Option urns.
-const (
-	// optGoNillable indicates that this top level schema should be returned as a pointer type.
-	optGoNillable = "beam:schema:go:nillable:v1"
-	// optGoInt indicates that this field should be decoded to an int, rather than an int64.
-	optGoInt = "beam:schema:go:int:v1"
-	// Since maps, arrays, and iterables don't have options, we need additional options
-	// to handle plain go integers.
-	optGoIntKey  = "beam:schema:go:intkey:v1"  // For int map keys
-	optGoIntElem = "beam:schema:go:intelem:v1" // For int values for maps,slices, and arrays
-)
-
-func goIntOptions(t reflect.Type) []*pipepb.Option {
-	var opts []*pipepb.Option
-	switch t.Kind() {
-	case reflect.Int:
-		opts = append(opts, &pipepb.Option{
-			Name: optGoInt,
-		})
-	case reflect.Map:
-		if t.Key().Kind() == reflect.Int {
-			opts = append(opts, &pipepb.Option{
-				Name: optGoIntKey,
-			})
-		}
-		fallthrough
-	case reflect.Array, reflect.Slice:
-		if t.Elem().Kind() == reflect.Int {
-			opts = append(opts, &pipepb.Option{
-				Name: optGoIntElem,
-			})
-		}
+	t := ot // keep the original type for errors.
+	// The top level schema for a pointer to struct and the struct is the same.
+	if t.Kind() == reflect.Ptr {
+		t = t.Elem()
 	}
-	return opts
-}
-
-// nillableFromOptions converts the passed in type to it's pointer version
-// if the option is present. This permits go types to be pointers.
-func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
-	return checkOptions(opts, optGoNillable, reflect.PtrTo(t))
-}
-
-func checkOptions(opts []*pipepb.Option, urn string, rt reflect.Type) reflect.Type {
-	for _, opt := range opts {
-		if opt.GetName() == urn {
-			return rt
-		}
+	if t.Kind() != reflect.Struct {
+		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
 	}
-	return nil
+	return structToSchema(t)
 }
 
-func structToSchema(ot reflect.Type) (*pipepb.Schema, error) {
-	if schm, ok := typeToSchema[ot]; ok {
-		return schm, nil
-	}
-	t := reflectx.SkipPtr(ot)
+func structToSchema(t reflect.Type) (*pipepb.Schema, error) {
 	fields := make([]*pipepb.Field, 0, t.NumField())
 	for i := 0; i < t.NumField(); i++ {
 		f, err := structFieldToField(t.Field(i))
@@ -199,13 +58,9 @@ func structToSchema(ot reflect.Type) (*pipepb.Schema, error) {
 		}
 		fields = append(fields, f)
 	}
-
-	schm := &pipepb.Schema{
+	return &pipepb.Schema{
 		Fields: fields,
-		Id:     getNextID(),
-	}
-	typeToSchema[ot] = schm
-	return schm, nil
+	}, nil
 }
 
 func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
@@ -213,18 +68,17 @@ func structFieldToField(sf reflect.StructField) (*pipepb.Field, error) {
 	if tag := sf.Tag.Get("beam"); tag != "" {
 		name, _ = parseTag(tag)
 	}
-	ftype, opts, err := reflectTypeToFieldType(sf.Type)
+	ftype, err := reflectTypeToFieldType(sf.Type)
 	if err != nil {
 		return nil, err
 	}
 	return &pipepb.Field{
-		Name:    name,
-		Type:    ftype,
-		Options: opts,
+		Name: name,
+		Type: ftype,
 	}, nil
 }
 
-func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Option, error) {
+func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, error) {
 	var isPtr bool
 	t := ot
 	if t.Kind() == reflect.Ptr {
@@ -233,13 +87,13 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 	}
 	switch t.Kind() {
 	case reflect.Map:
-		kt, _, err := reflectTypeToFieldType(t.Key())
+		kt, err := reflectTypeToFieldType(t.Key())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert key of %v to schema field", ot)
 		}
-		vt, _, err := reflectTypeToFieldType(t.Elem())
+		vt, err := reflectTypeToFieldType(t.Elem())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert value of %v to schema field", ot)
 		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
@@ -249,11 +103,11 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					ValueType: vt,
 				},
 			},
-		}, goIntOptions(t), nil
+		}, nil
 	case reflect.Struct:
 		sch, err := structToSchema(t)
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert %v to schema field", ot)
 		}
 		return &pipepb.FieldType{
 			Nullable: isPtr,
@@ -262,7 +116,7 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					Schema: sch,
 				},
 			},
-		}, nil, nil
+		}, nil
 	case reflect.Slice, reflect.Array:
 		// Special handling for []byte
 		if t == reflectx.ByteSlice {
@@ -271,13 +125,12 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 				TypeInfo: &pipepb.FieldType_AtomicType{
 					AtomicType: pipepb.AtomicType_BYTES,
 				},
-			}, nil, nil
+			}, nil
 		}
-		vt, _, err := reflectTypeToFieldType(t.Elem())
+		vt, err := reflectTypeToFieldType(t.Elem())
 		if err != nil {
-			return nil, nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
+			return nil, errors.Wrapf(err, "unable to convert element type of %v to schema field", ot)
 		}
-		opts := goIntOptions(t)
 		return &pipepb.FieldType{
 			Nullable: isPtr,
 			TypeInfo: &pipepb.FieldType_ArrayType{
@@ -285,9 +138,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 					ElementType: vt,
 				},
 			},
-		}, opts, nil
+		}, nil
 	case reflect.Interface, reflect.Chan, reflect.UnsafePointer, reflect.Complex128, reflect.Complex64:
-		return nil, nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
+		return nil, errors.Errorf("unable to convert unsupported type %v to schema", ot)
 	default: // must be an atomic type
 		if enum, ok := reflectTypeToAtomicTypeMap[t.Kind()]; ok {
 			return &pipepb.FieldType{
@@ -295,9 +148,9 @@ func reflectTypeToFieldType(ot reflect.Type) (*pipepb.FieldType, []*pipepb.Optio
 				TypeInfo: &pipepb.FieldType_AtomicType{
 					AtomicType: enum,
 				},
-			}, goIntOptions(t), nil
+			}, nil
 		}
-		return nil, nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
+		return nil, errors.Errorf("unable to map %v to pipepb.AtomicType", t)
 	}
 }
 
@@ -325,32 +178,20 @@ func ToType(s *pipepb.Schema) (reflect.Type, error) {
 		}
 		fields = append(fields, rf)
 	}
-	ret := reflect.StructOf(fields)
-	if ut, ok := syntheticToUser[ret]; ok {
-		ret = ut
-	}
-	if t := nillableFromOptions(s.GetOptions(), ret); t != nil {
-		return t, nil
-	}
-	return ret, nil
+	return reflect.StructOf(fields), nil
 }
 
 func fieldToStructField(sf *pipepb.Field) (reflect.StructField, error) {
 	name := sf.GetName()
-	rt, err := fieldTypeToReflectType(sf.GetType(), sf.Options)
+	rt, err := fieldTypeToReflectType(sf.GetType())
 	if err != nil {
 		return reflect.StructField{}, err
 	}
-
-	rsf := reflect.StructField{
+	return reflect.StructField{
 		Name: strings.ToUpper(name[:1]) + name[1:], // Go field name must be capitalized for export and encoding.
 		Type: rt,
-	}
-	// Add a name tag if they don't match.
-	if name != rsf.Name {
-		rsf.Tag = reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name))
-	}
-	return rsf, nil
+		Tag:  reflect.StructTag(fmt.Sprintf("beam:\"%s\"", name)),
+	}, nil
 }
 
 var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
@@ -365,7 +206,7 @@ var atomicTypeToReflectType = map[pipepb.AtomicType]reflect.Type{
 	pipepb.AtomicType_BYTES:   reflectx.ByteSlice,
 }
 
-func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (reflect.Type, error) {
+func fieldTypeToReflectType(sft *pipepb.FieldType) (reflect.Type, error) {
 	var t reflect.Type
 	switch sft.GetTypeInfo().(type) {
 	case *pipepb.FieldType_AtomicType:
@@ -373,35 +214,21 @@ func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (refle
 		if t, ok = atomicTypeToReflectType[sft.GetAtomicType()]; !ok {
 			return nil, errors.Errorf("unknown atomic type: %v", sft.GetAtomicType())
 		}
-		// Handle duplicate type matchings.
-		if optT := checkOptions(opts, optGoInt, reflectx.Int); optT != nil {
-			t = optT
-		}
 	case *pipepb.FieldType_ArrayType:
-		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType(), nil)
+		rt, err := fieldTypeToReflectType(sft.GetArrayType().GetElementType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert array element type")
 		}
-		// Handle duplicate type matchings.
-		if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil {
-			rt = optT
-		}
 		t = reflect.SliceOf(rt)
 	case *pipepb.FieldType_MapType:
-		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType(), nil)
+		kt, err := fieldTypeToReflectType(sft.GetMapType().GetKeyType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert map key type")
 		}
-		if optT := checkOptions(opts, optGoIntKey, reflectx.Int); optT != nil {
-			kt = optT
-		}
-		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType(), nil)
+		vt, err := fieldTypeToReflectType(sft.GetMapType().GetValueType())
 		if err != nil {
 			return nil, errors.Wrap(err, "unable to convert map value type")
 		}
-		if optT := checkOptions(opts, optGoIntElem, reflectx.Int); optT != nil {
-			vt = optT
-		}
 		t = reflect.MapOf(kt, vt) // Panics for invalid map keys (slices/iterables)
 	case *pipepb.FieldType_RowType:
 		rt, err := ToType(sft.GetRowType().GetSchema())
@@ -412,9 +239,8 @@ func fieldTypeToReflectType(sft *pipepb.FieldType, opts []*pipepb.Option) (refle
 	// case *pipepb.FieldType_IterableType:
 	// TODO(BEAM-9615): handle IterableTypes.
 
-	//case *pipepb.FieldType_LogicalType:
+	// case *pipepb.FieldType_LogicalType:
 	// TODO(BEAM-9615): handle LogicalTypes types.
-	//sft.GetLogicalType().
 
 	// Logical Types are for things that have more specialized user representation already, or
 	// things like Time or protocol buffers.
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index 741b79f..f6b5ff7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -20,32 +20,10 @@ import (
 	"reflect"
 	"testing"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-	"github.com/golang/protobuf/proto"
 	"github.com/google/go-cmp/cmp"
-	"google.golang.org/protobuf/testing/protocmp"
 )
 
-type registeredType struct {
-	A, B string
-	C    bool
-}
-
-type sRegisteredType struct {
-	D int32
-}
-
-type justAType struct {
-	A, B string
-	C    int
-}
-
-func init() {
-	runtime.RegisterType(reflect.TypeOf((*registeredType)(nil)))
-	RegisterType(reflect.TypeOf((*sRegisteredType)(nil)))
-}
-
 func TestSchemaConversion(t *testing.T) {
 	tests := []struct {
 		st *pipepb.Schema
@@ -172,177 +150,6 @@ func TestSchemaConversion(t *testing.T) {
 			rt: reflect.TypeOf(struct {
 				Payloads [][]byte `beam:"payloads"`
 			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "AString",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "AnIntPtr",
-						Type: &pipepb.FieldType{
-							Nullable: true,
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT32,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(struct {
-				AString  string
-				AnIntPtr *int32
-			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "A",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "B",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "C",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_BOOLEAN,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(registeredType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "D",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT32,
-							},
-						},
-					},
-				},
-			},
-			rt: reflect.TypeOf(sRegisteredType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					&pipepb.Field{
-						Name: "A",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "B",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_STRING,
-							},
-						},
-					},
-					&pipepb.Field{
-						Name: "C",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT64,
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoInt,
-						}},
-					},
-				},
-			},
-			rt: reflect.TypeOf(justAType{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					{
-						Name: "Q",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_MapType{
-								MapType: &pipepb.MapType{
-									KeyType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-									ValueType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-								},
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoIntKey,
-						}, {
-							Name: optGoIntElem,
-						}},
-					}, {
-						Name: "T",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_ArrayType{
-								ArrayType: &pipepb.ArrayType{
-									ElementType: &pipepb.FieldType{
-										TypeInfo: &pipepb.FieldType_AtomicType{
-											AtomicType: pipepb.AtomicType_INT64,
-										},
-									},
-								},
-							},
-						},
-						Options: []*pipepb.Option{{
-							Name: optGoIntElem,
-						}},
-					},
-				},
-			},
-			rt: reflect.TypeOf(struct {
-				Q map[int]int
-				T []int
-			}{}),
-		}, {
-			st: &pipepb.Schema{
-				Fields: []*pipepb.Field{
-					{
-						Name: "SuperNES",
-						Type: &pipepb.FieldType{
-							TypeInfo: &pipepb.FieldType_AtomicType{
-								AtomicType: pipepb.AtomicType_INT16,
-							},
-						},
-					},
-				},
-				Options: []*pipepb.Option{{
-					Name: optGoNillable,
-				}},
-			},
-			rt: reflect.TypeOf(&struct {
-				SuperNES int16
-			}{}),
 		},
 	}
 
@@ -354,11 +161,9 @@ func TestSchemaConversion(t *testing.T) {
 				if err != nil {
 					t.Fatalf("error ToType(%v) = %v", test.st, err)
 				}
-				if !test.rt.AssignableTo(got) {
-					t.Errorf("%v not assignable to %v", test.rt, got)
-					if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" {
-						t.Errorf("diff (-want, +got): %v", d)
-					}
+
+				if d := cmp.Diff(reflect.New(test.rt).Elem().Interface(), reflect.New(got).Elem().Interface()); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
 				}
 			}
 			{
@@ -366,10 +171,8 @@ func TestSchemaConversion(t *testing.T) {
 				if err != nil {
 					t.Fatalf("error FromType(%v) = %v", test.rt, err)
 				}
-				if d := cmp.Diff(test.st, got,
-					protocmp.Transform(),
-					protocmp.IgnoreFields(proto.MessageV2(&pipepb.Schema{}), "id"),
-				); d != "" {
+
+				if d := cmp.Diff(test.st, got); d != "" {
 					t.Errorf("diff (-want, +got): %v", d)
 				}