You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/29 17:46:46 UTC

[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910234889


##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in

Review Comment:
   Can you clarify using `typ` (instead of `type`)?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   Do we have to return `'RowTypeConstraint'` because this is a `@staticmethod`?



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -41,6 +43,52 @@
 from apache_beam.typehints.schemas import typing_to_runner_api
 from apache_beam.utils.timestamp import Timestamp
 
+all_nonoptional_primitives = [
+    np.int8,
+    np.int16,
+    np.int32,
+    np.int64,
+    np.float32,
+    np.float64,
+    bool,
+    bytes,
+    str,
+]
+
+all_optional_primitives = [Optional[typ] for typ in all_nonoptional_primitives]
+
+all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+basic_map_types = [
+    Mapping[key_type, value_type] for key_type,
+    value_type in itertools.product(all_primitives, all_primitives)
+]
+
+
+class AllPrimitives(NamedTuple):
+  field_int8: np.int8
+  field_int16: np.int16
+  field_int32: np.int32
+  field_int64: np.int64
+  field_float32: np.float32
+  field_float64: np.float64
+  field_bool: bool
+  field_bytes: bytes
+  field_str: str
+  field_optional_bool: Optional[bool]
+  field_optional_int32: Optional[np.int32]

Review Comment:
   Do we want to add the optional versions for the others as well (`int8`, `float32`, etc.)?



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in
+      all_primitives + \
+      basic_array_types + \
+      basic_map_types]
+                        )
+  def test_typing_survives_proto_roundtrip(self, typ):
+    self.assertEqual(
+        typ,
+        typing_from_runner_api(
+            typing_to_runner_api(typ, schema_registry=SchemaTypeRegistry()),
+            schema_registry=SchemaTypeRegistry()))
 
-    test_cases = all_primitives + \
-                 basic_array_types + \
-                 basic_map_types
+  @parameterized.expand([(AllPrimitives, ), (ComplexSchema, )])

Review Comment:
   Does `parameterize` require singletons? i.e. creating `(AllPrimitives, )` or `(ComplexSchema, )`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org