You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/13 04:33:49 UTC

[GitHub] [beam] youngoli commented on a change in pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

youngoli commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r593553104



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {

Review comment:
       I notice the beginning of `structToSchema` is very similar to this section of `fromType`, and it gets called right after this section anyway. Is there a reason this needs to be here instead of just calling `structToSchema` right away? (If so it might be good to leave a comment explaining it)

##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -114,16 +114,38 @@ func (b *RowDecoderBuilder) decoderForType(t reflect.Type) (func(io.Reader) (int
 // decoderForStructReflect returns a reflection based decoder function for the
 // given struct type.
 func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type) (func(reflect.Value, io.Reader) error, error) {
-
 	var coder typeDecoderReflect
 	coder.typ = t
 	for i := 0; i < t.NumField(); i++ {
 		i := i // avoid alias issues in the closures.
 		sf := t.Field(i)
 		isUnexported := sf.PkgPath != ""
-		if isUnexported {
+		if sf.Anonymous {
+			ft := sf.Type
+			if ft.Kind() == reflect.Ptr {
+				// If a struct embeds a pointer to an unexported type,
+				// it is not possible to set a newly allocated value
+				// since the field is unexported.
+				//
+				// See https://golang.org/issue/21357
+				//
+				// Since the values are created by this package reflectively,
+				// there's no work around like pre-allocating the field
+				// manually.
+				if isUnexported {

Review comment:
       Does `isUnexported` describe whether the field itself is unexported? Or whether the type of the field is unexported? I thought it was the first, but based on this line and the comment you added at line 144, it's actually the second one. Is that right?

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {
+		schm := ftype.GetRowType().GetSchema()
+		schm = proto.Clone(schm).(*pipepb.Schema)
+		if ot.Kind() == reflect.Ptr {
+			schm.Options = append(schm.Options, &pipepb.Option{
+				Name: optGoNillable,
+			})
+		}
+		if lID != "" {
+			schm.Options = append(schm.Options, logicalOption(lID))
+		}
+		schm.Id = r.getNextID()
+		r.typeToSchema[ot] = schm
+		r.idToType[schm.GetId()] = ot
+		return schm, nil
 	}
-	return schm, nil
+
+	t := reflectx.SkipPtr(ot)
+
+	schm, err := r.structToSchema(t)
+	if err != nil {
+		return nil, err
+	}
+	// Cache the pointer type here with it's own id.
+	pt := reflect.PtrTo(t)
+	schm = proto.Clone(schm).(*pipepb.Schema)
+	schm.Id = r.getNextID()
+	schm.Options = append(schm.Options, &pipepb.Option{
+		Name: optGoNillable,
+	})
+	r.idToType[schm.GetId()] = pt
+	r.typeToSchema[pt] = schm
+
+	// Return whatever the original type was.
+	return r.typeToSchema[ot], nil

Review comment:
       After reading a little further down here, I think I might be missing some implicit assumption here. Is everything after line 360 assuming that if the type wasn't found above in either `typeToSchema` or `logicalTypeToFieldType` then it must be a pointer type? Because we return the original type, but it wouldn't be present in that map unless `ot` and `pt` matched, right?
   
   Edit: Or maybe this works here because we called `structToSchema` on the non-pointer version, and then also added the pointer version, so one or the other is guaranteed to be present? That seems worth adding a comment too. Like:
   ```
   // Both pointer and non-pointer variants of the original type are registered, so we can return whatever the original type was.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org