You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/29 16:28:28 UTC

[GitHub] [beam] ryanthompson591 commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

ryanthompson591 commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910155133


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Also, are schema ids something that should ever be none? I see below that you have a schema registry.  Would it make sense to register the new ID right away? Or perhaps add a comment here: something like, schema ids are none when not registered, these ID's will be filled in in schemas.py?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)

Review Comment:
   when I see stuff like a return in the middle of a long method I start to think this is an example of a method that should be broken down into smaller named methods.



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
     result = self.by_id.get(unique_id, None)
     return result[1] if result is not None else None
 
+  def get_id_by_typing(self, typing):
+    result = self.by_typing.get(typing, None)

Review Comment:
   not sure if it will hurt line length, but I like
   id_schema_tuple as a variable name instead of result.
   
   Or:
   id, _ = self.by_typing.get(typing, (None, None))
   return id



##########
sdks/python/apache_beam/typehints/trivial_inference.py:
##########
@@ -438,8 +438,11 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
           from apache_beam.pvalue import Row
           if state.stack[-pop_count].value == Row:
             fields = state.stack[-1].value
-            return_type = row_type.RowTypeConstraint(
-                zip(fields, Const.unwrap_all(state.stack[-pop_count + 1:-1])))
+            return_type = row_type.RowTypeConstraint.from_fields(
+                list(
+                    zip(
+                        fields,
+                        Const.unwrap_all(state.stack[-pop_count + 1:-1]))))

Review Comment:
   the logic of this line was hard for me to follow.



##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint
+    if not self.element_type is RowTypeConstraint:
+      return None

Review Comment:
   can you add a comment as to why we return None here.



##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint

Review Comment:
   All other imports are at the top of the file. Why is this import inlined?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Not sure if it's cleaner.
   
   getattr(obj, 'attr', lambda: None)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org