You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/11 00:23:11 UTC

[GitHub] [beam] lostluck opened a new pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding.

lostluck opened a new pull request #14192:
URL: https://github.com/apache/beam/pull/14192


   *  Make map encoding deterministic.
   *  Handle embedded Exported fields in the Row coder.
   *  Translating embedded fields to schemas.
   
   Tests cover all non-error cases.
   
   I'm still working on a cleanup and additional testing for Logical types. There's a bunch of code I need to deduplicate, since the duplication isn't useful, and the tests don't cover as much as desired regarding logical types.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #14192:
URL: https://github.com/apache/beam/pull/14192#issuecomment-801246527


   Run Go PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #14192:
URL: https://github.com/apache/beam/pull/14192#issuecomment-797096013


   Run Go PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r596180354



##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -114,16 +114,38 @@ func (b *RowDecoderBuilder) decoderForType(t reflect.Type) (func(io.Reader) (int
 // decoderForStructReflect returns a reflection based decoder function for the
 // given struct type.
 func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type) (func(reflect.Value, io.Reader) error, error) {
-
 	var coder typeDecoderReflect
 	coder.typ = t
 	for i := 0; i < t.NumField(); i++ {
 		i := i // avoid alias issues in the closures.
 		sf := t.Field(i)
 		isUnexported := sf.PkgPath != ""
-		if isUnexported {
+		if sf.Anonymous {
+			ft := sf.Type
+			if ft.Kind() == reflect.Ptr {
+				// If a struct embeds a pointer to an unexported type,
+				// it is not possible to set a newly allocated value
+				// since the field is unexported.
+				//
+				// See https://golang.org/issue/21357
+				//
+				// Since the values are created by this package reflectively,
+				// there's no work around like pre-allocating the field
+				// manually.
+				if isUnexported {

Review comment:
       This one is pretty complicated.
   A reflect.StructField.PkgPath has this documentation
   `PkgPath is the package path that qualifies a lower case (unexported) field name. It is empty for upper case (exported) field names.` So normally, it's only referring to the field itself.
   This changes when it's an embedded field. As far as the AST (abstract syntax tree) and reflective representations are concerned,  an embedded field is named *the same as the type being embedded*. However, this means that if you're embedding an unexported type, that has Exported fields itself.  So in the block where we check whether the field is Anonymous (AKA embedded), we know that the exported state of the field indicates whether the embedded type is unexported.
   But, you want to be able to access those fields, and methods. The compiler hoists the methods on embedded fields to the containing type, allowing the any interfaces the embedded type satisfieds to be also satisfied by the containing type. So, in this case, we want to serialize the exported fields of the embedded type, whether or not the type itself is exported, since the user expectation is they could access it.
   
   Relatedly, if the embedded type is a pointer to an unexported type, there's no way for us to synthetically create and allocate to that field, hence the big comment there. A non-pointer doesn't have this problem because the fields of the embedded type are a part of the container type's allocation, so we have access to them as expected. The Go reflect library desperately avoids allocating and changing values of unexported fields, but that's not the same on types themselves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r596200323



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {

Review comment:
       Very similar means they aren't the same. Also, structToSchema *always and only* handles structs, not pointers to structs. If a pointer type is registered for special handling, structToSchema will *never* catch it.
   
   Originally structToSchema handled both, but that lead to *so many bugs*, which this current approach avoids. You're right that there's probably a further reduction opportunity here, but the cleanups done here already are already far enough from what I have in my internal client, that I want to limit further drift now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r596209027



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {
+		schm := ftype.GetRowType().GetSchema()
+		schm = proto.Clone(schm).(*pipepb.Schema)
+		if ot.Kind() == reflect.Ptr {
+			schm.Options = append(schm.Options, &pipepb.Option{
+				Name: optGoNillable,
+			})
+		}
+		if lID != "" {
+			schm.Options = append(schm.Options, logicalOption(lID))
+		}
+		schm.Id = r.getNextID()
+		r.typeToSchema[ot] = schm
+		r.idToType[schm.GetId()] = ot
+		return schm, nil
 	}
-	return schm, nil
+
+	t := reflectx.SkipPtr(ot)
+
+	schm, err := r.structToSchema(t)
+	if err != nil {
+		return nil, err
+	}
+	// Cache the pointer type here with it's own id.
+	pt := reflect.PtrTo(t)
+	schm = proto.Clone(schm).(*pipepb.Schema)
+	schm.Id = r.getNextID()
+	schm.Options = append(schm.Options, &pipepb.Option{
+		Name: optGoNillable,
+	})
+	r.idToType[schm.GetId()] = pt
+	r.typeToSchema[pt] = schm
+
+	// Return whatever the original type was.
+	return r.typeToSchema[ot], nil

Review comment:
       Combined addressing this comment with the earlier one by adding a comment to fromType about it's purpose.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #14192:
URL: https://github.com/apache/beam/pull/14192#discussion_r593553104



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {

Review comment:
       I notice the beginning of `structToSchema` is very similar to this section of `fromType`, and it gets called right after this section anyway. Is there a reason this needs to be here instead of just calling `structToSchema` right away? (If so it might be good to leave a comment explaining it)

##########
File path: sdks/go/pkg/beam/core/graph/coder/row_decoder.go
##########
@@ -114,16 +114,38 @@ func (b *RowDecoderBuilder) decoderForType(t reflect.Type) (func(io.Reader) (int
 // decoderForStructReflect returns a reflection based decoder function for the
 // given struct type.
 func (b *RowDecoderBuilder) decoderForStructReflect(t reflect.Type) (func(reflect.Value, io.Reader) error, error) {
-
 	var coder typeDecoderReflect
 	coder.typ = t
 	for i := 0; i < t.NumField(); i++ {
 		i := i // avoid alias issues in the closures.
 		sf := t.Field(i)
 		isUnexported := sf.PkgPath != ""
-		if isUnexported {
+		if sf.Anonymous {
+			ft := sf.Type
+			if ft.Kind() == reflect.Ptr {
+				// If a struct embeds a pointer to an unexported type,
+				// it is not possible to set a newly allocated value
+				// since the field is unexported.
+				//
+				// See https://golang.org/issue/21357
+				//
+				// Since the values are created by this package reflectively,
+				// there's no work around like pre-allocating the field
+				// manually.
+				if isUnexported {

Review comment:
       Does `isUnexported` describe whether the field itself is unexported? Or whether the type of the field is unexported? I thought it was the first, but based on this line and the comment you added at line 144, it's actually the second one. Is that right?

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
##########
@@ -280,82 +301,194 @@ func (r *Registry) FromType(ot reflect.Type) (*pipepb.Schema, error) {
 	return r.fromType(ot)
 }
 
+func (r *Registry) logicalTypeToFieldType(t reflect.Type) (*pipepb.FieldType, string, error) {
+	// Check if a logical type was registered that matches this struct type directly
+	// and if so, extract the schema from it for use.
+	if lID, ok := r.logicalTypeIdentifiers[t]; ok {
+		lt := r.logicalTypes[lID]
+		ftype, err := r.reflectTypeToFieldType(lt.StorageType())
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", lID, lt.StorageType(), lt.GoType())
+		}
+		return ftype, lID, nil
+	}
+	for _, lti := range r.logicalTypeInterfaces {
+		if !t.Implements(lti) {
+			continue
+		}
+		p := r.logicalTypeProviders[lti]
+		st, err := p(t)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v] using provider for %v schema field", t, lti)
+		}
+		if st == nil {
+			continue
+		}
+		ftype, err := r.reflectTypeToFieldType(st)
+		if err != nil {
+			return nil, "", errors.Wrapf(err, "unable to convert LogicalType[%v]'s storage type %v for Go type of %v to a schema", "interface", st, t)
+		}
+		return ftype, t.String(), nil
+	}
+	return nil, "", nil
+}
+
 func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
-	if reflectx.SkipPtr(ot).Kind() != reflect.Struct {
-		return nil, errors.Errorf("cannot convert %v to schema. FromType only converts structs to schemas", ot)
+	if schm, ok := r.typeToSchema[ot]; ok {
+		return schm, nil
 	}
-	schm, err := r.structToSchema(ot)
+	ftype, lID, err := r.logicalTypeToFieldType(ot)
 	if err != nil {
 		return nil, err
 	}
-	if ot.Kind() == reflect.Ptr {
-		schm.Options = append(schm.Options, &pipepb.Option{
-			Name: optGoNillable,
-		})
+	if ftype != nil {
+		schm := ftype.GetRowType().GetSchema()
+		schm = proto.Clone(schm).(*pipepb.Schema)
+		if ot.Kind() == reflect.Ptr {
+			schm.Options = append(schm.Options, &pipepb.Option{
+				Name: optGoNillable,
+			})
+		}
+		if lID != "" {
+			schm.Options = append(schm.Options, logicalOption(lID))
+		}
+		schm.Id = r.getNextID()
+		r.typeToSchema[ot] = schm
+		r.idToType[schm.GetId()] = ot
+		return schm, nil
 	}
-	return schm, nil
+
+	t := reflectx.SkipPtr(ot)
+
+	schm, err := r.structToSchema(t)
+	if err != nil {
+		return nil, err
+	}
+	// Cache the pointer type here with it's own id.
+	pt := reflect.PtrTo(t)
+	schm = proto.Clone(schm).(*pipepb.Schema)
+	schm.Id = r.getNextID()
+	schm.Options = append(schm.Options, &pipepb.Option{
+		Name: optGoNillable,
+	})
+	r.idToType[schm.GetId()] = pt
+	r.typeToSchema[pt] = schm
+
+	// Return whatever the original type was.
+	return r.typeToSchema[ot], nil

Review comment:
       After reading a little further down here, I think I might be missing some implicit assumption here. Is everything after line 360 assuming that if the type wasn't found above in either `typeToSchema` or `logicalTypeToFieldType` then it must be a pointer type? Because we return the original type, but it wouldn't be present in that map unless `ot` and `pt` matched, right?
   
   Edit: Or maybe this works here because we called `structToSchema` on the non-pointer version, and then also added the pointer version, so one or the other is guaranteed to be present? That seems worth adding a comment too. Like:
   ```
   // Both pointer and non-pointer variants of the original type are registered, so we can return whatever the original type was.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding.

Posted by GitBox <gi...@apache.org>.
lostluck commented on pull request #14192:
URL: https://github.com/apache/beam/pull/14192#issuecomment-796316879


   R: @youngoli 
   CC: @tysonjh 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck merged pull request #14192: [BEAM-9615] Embedded structs and Deterministic map encoding, and Logical Types.

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #14192:
URL: https://github.com/apache/beam/pull/14192


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org