You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/01 15:17:07 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910469134


##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Sure, `SchemaTranslation` just tracks the registry that is used for caching schemas by ID. The intention is that we can specify a registry to use at the entrypoint (the free function `typing_from_runner_api`), then drop into the implementation which keeps track of that context.
   
   These are just places that I missed when moving the logic over to use `SchemaTranslation` in #17108. The intention was for all recursive calls to use self, and only have the free functions as the input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org