You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2019/11/22 14:49:00 UTC

[jira] [Commented] (BEAM-8732) Add support for additional structured types to Schemas/RowCoders

    [ https://issues.apache.org/jira/browse/BEAM-8732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16980206#comment-16980206 ] 

Brian Hulette commented on BEAM-8732:
-------------------------------------

I'm definitely fine with you taking this on, but I'm not sure that calling {{typing_to_runner_api()}} is all that needs to happen. Since that function generates a random UUID for the schema, I think it'll get a different UUID when executed at pipeline construction time than when executed at pipeline execution time (on the workers), so the workers won't be able to look up the correct class in the registry and will generate one instead. Perhaps one solution would be to make the UUID generation deterministic somehow? But I'm not sure how feasible that is.

In general the solution to this problem in the Java SDK has been to include serialized java classes in the pipeline graph, e.g. [SchemaCoder|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java] is currently just represented as a big serialized class including the user class and functions for converting to/from Row. We also currently allow for [java-specific logical types|https://github.com/apache/beam/blob/07d952f313477ee18cdc706100ba7e1810b1ef4f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SchemaTranslation.java#L91-L102] which serialize a class for converting to/from user types.

cc: [~reuvenlax] who is more familiar with the Java side of this than me.

> Add support for additional structured types to Schemas/RowCoders
> ----------------------------------------------------------------
>
>                 Key: BEAM-8732
>                 URL: https://issues.apache.org/jira/browse/BEAM-8732
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chad Dombrova
>            Priority: Major
>
> Currently we can convert between a {{NamedTuple}} type and its {{Schema}} protos using {{named_tuple_from_schema}} and {{named_tuple_to_schema}}. I'd like to introduce a system to support additional types, starting with structured types like {{attrs}}, {{dataclasses}}, and {{TypedDict}}.
> I've only just started digesting the code, but this task seems pretty straightforward. For example, I think the type-to-schema code would look roughly like this:
> {code:python}
> def typing_to_runner_api(type_):
>   # type: (Type) -> schema_pb2.FieldType
>   structured_handler = _get_structured_handler(type_)
>   if structured_handler:
>     schema = None
>     if hasattr(type_, 'id'):
>       schema = SCHEMA_REGISTRY.get_schema_by_id(type_.id)
>     if schema is None:
>       fields = structured_handler.get_fields()
>       type_id = str(uuid4())
>       schema = schema_pb2.Schema(fields=fields, id=type_id)
>       SCHEMA_REGISTRY.add(type_, schema)
>     return schema_pb2.FieldType(
>         row_type=schema_pb2.RowType(
>             schema=schema))
> {code}
> The rest of the work would be in implementing a class hierarchy for working with structured types, such as getting a list of fields from an instance, and instantiation from a list of fields. Eventually we can extend this behavior to arbitrary, unstructured types.  
> Going in the schema-to-type direction, we have the problem of choosing which type to use for a given schema. I believe that as long as {{typing_to_runner_api()}} has been called on our structured type in the current python session, it should be added to the registry and thus round trip ok, so I think we just need a public function for registering schemas for structured types.
> [~bhulette] Did you want to tackle this or are you ok with me going after it?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)