You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/27 18:24:17 UTC

[GitHub] [beam] TheNeuralBit opened a new pull request, #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

TheNeuralBit opened a new pull request, #22066:
URL: https://github.com/apache/beam/pull/22066

   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r912080430


##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in
+      all_primitives + \
+      basic_array_types + \
+      basic_map_types]
+                        )
+  def test_typing_survives_proto_roundtrip(self, typ):
+    self.assertEqual(
+        typ,
+        typing_from_runner_api(
+            typing_to_runner_api(typ, schema_registry=SchemaTypeRegistry()),
+            schema_registry=SchemaTypeRegistry()))
 
-    test_cases = all_primitives + \
-                 basic_array_types + \
-                 basic_map_types
+  @parameterized.expand([(AllPrimitives, ), (ComplexSchema, )])

Review Comment:
   I see, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1172645294

   Thanks @yeandy, @ryanthompson591. I appreciate the thorough reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1168080820

   R: @yeandy @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910527379


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   Well it looks like PyLint fails here because flake8 can't handle these postponed annotations
   ```
   16:00:57 Running flake8...
   16:00:57 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonLint_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-lint/py37-lint/lib/python3.7/site-packages/pycodestyle.py:113: FutureWarning: Possible nested set at position 1
   16:00:57   EXTRANEOUS_WHITESPACE_REGEX = re.compile(r'[[({] | []}),;:]')
   16:01:05 apache_beam/typehints/row_type.py:76:51: F821 undefined name 'RowTypeConstraint'
   16:01:05   def from_user_type(user_type: type) -> Optional[RowTypeConstraint]:
   16:01:05                                                   ^
   16:01:05 apache_beam/typehints/row_type.py:86:58: F821 undefined name 'RowTypeConstraint'
   16:01:05   def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
   16:01:05                                                          ^
   16:01:05 2     F821 undefined name 'RowTypeConstraint'
   16:01:05 2
   16:01:05 Command exited with non-zero status 1
   16:01:05 582.99user 20.95system 1:17.53elapsed 778%CPU (0avgtext+0avgdata 677744maxresident)k
   16:01:05 16inputs+400outputs (2major+1959397minor)pagefaults 0swaps
   16:01:05 ERROR: InvocationError for command /usr/bin/time scripts/run_pylint.sh (exited with code 1)
   ```
   
   This is https://github.com/PyCQA/pyflakes/issues/356
   
   I sent https://github.com/apache/beam/pull/22110 to upgrade flake8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910234889


##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in

Review Comment:
   Can you clarify using `typ` (instead of `type`)?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   Do we have to return `'RowTypeConstraint'` because this is a `@staticmethod`?



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -41,6 +43,52 @@
 from apache_beam.typehints.schemas import typing_to_runner_api
 from apache_beam.utils.timestamp import Timestamp
 
+all_nonoptional_primitives = [
+    np.int8,
+    np.int16,
+    np.int32,
+    np.int64,
+    np.float32,
+    np.float64,
+    bool,
+    bytes,
+    str,
+]
+
+all_optional_primitives = [Optional[typ] for typ in all_nonoptional_primitives]
+
+all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+basic_map_types = [
+    Mapping[key_type, value_type] for key_type,
+    value_type in itertools.product(all_primitives, all_primitives)
+]
+
+
+class AllPrimitives(NamedTuple):
+  field_int8: np.int8
+  field_int16: np.int16
+  field_int32: np.int32
+  field_int64: np.int64
+  field_float32: np.float32
+  field_float64: np.float64
+  field_bool: bool
+  field_bytes: bytes
+  field_str: str
+  field_optional_bool: Optional[bool]
+  field_optional_int32: Optional[np.int32]

Review Comment:
   Do we want to add the optional versions for the others as well (`int8`, `float32`, etc.)?



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in
+      all_primitives + \
+      basic_array_types + \
+      basic_map_types]
+                        )
+  def test_typing_survives_proto_roundtrip(self, typ):
+    self.assertEqual(
+        typ,
+        typing_from_runner_api(
+            typing_to_runner_api(typ, schema_registry=SchemaTypeRegistry()),
+            schema_registry=SchemaTypeRegistry()))
 
-    test_cases = all_primitives + \
-                 basic_array_types + \
-                 basic_map_types
+  @parameterized.expand([(AllPrimitives, ), (ComplexSchema, )])

Review Comment:
   Does `parameterize` require singletons? i.e. creating `(AllPrimitives, )` or `(ComplexSchema, )`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910469134


##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Sure, `SchemaTranslation` just tracks the registry that is used for caching schemas by ID. The intention is that we can specify a registry to use at the entrypoint (the free function `typing_from_runner_api`), then drop into the implementation which keeps track of that context.
   
   These are just places that I missed when moving the logic over to use `SchemaTranslation` in #17108. The intention was for all recursive calls to use self, and only have the free functions as the input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1167773756

   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r911334896


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   #22110 is merged so PyLint issue is resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r912062806


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   > Do you mean the string 'RowTypeConstraint' rather than an actual RowTypeConstraint typehint?
   
   Yes.
   
   And thanks for the clarification and change. So basically, in Python < 3.7, you have to use the string instead of the class itself for forward annotations. In Python 3.7+, you have to use `from __future__ import annotations` if you want to use the class itself. It looks like it was going to be default in Python 3.10, but was [reverted](https://mail.python.org/archives/list/python-dev@python.org/thread/CLVXXPQ2T2LQ5MP2Y53VVQFCXYWQJHKZ/) last minute.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910527379


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   Well it looks like PyLint fails here because flake8 can't handle this postponed annotations
   ```
   16:00:57 Running flake8...
   16:00:57 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_PythonLint_Commit/src/sdks/python/test-suites/tox/py37/build/srcs/sdks/python/target/.tox-py37-lint/py37-lint/lib/python3.7/site-packages/pycodestyle.py:113: FutureWarning: Possible nested set at position 1
   16:00:57   EXTRANEOUS_WHITESPACE_REGEX = re.compile(r'[[({] | []}),;:]')
   16:01:05 apache_beam/typehints/row_type.py:76:51: F821 undefined name 'RowTypeConstraint'
   16:01:05   def from_user_type(user_type: type) -> Optional[RowTypeConstraint]:
   16:01:05                                                   ^
   16:01:05 apache_beam/typehints/row_type.py:86:58: F821 undefined name 'RowTypeConstraint'
   16:01:05   def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
   16:01:05                                                          ^
   16:01:05 2     F821 undefined name 'RowTypeConstraint'
   16:01:05 2
   16:01:05 Command exited with non-zero status 1
   16:01:05 582.99user 20.95system 1:17.53elapsed 778%CPU (0avgtext+0avgdata 677744maxresident)k
   16:01:05 16inputs+400outputs (2major+1959397minor)pagefaults 0swaps
   16:01:05 ERROR: InvocationError for command /usr/bin/time scripts/run_pylint.sh (exited with code 1)
   ```
   
   This is https://github.com/PyCQA/pyflakes/issues/356
   
   I sent https://github.com/apache/beam/pull/22110 to upgrade flake8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910399845


##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint
+    if not self.element_type is RowTypeConstraint:
+      return None

Review Comment:
   Ah sorry this is something I was experimenting with that we don't need to include in this PR. I've removed it.
   
   The idea was to provide something similar to Java's `PCollection.getSchema`: https://github.com/apache/beam/blob/de04604aee0275a1feeb216598179f445ac79445/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L332-L337
   
   (this would have returned None in the "no inferred schema" case rather than raising an error though)



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Currently we assign IDs when mapping to a Schema proto, in `apache_beam.typehints.schemas`, primarily just because the spec requires one: https://github.com/apache/beam/blob/de04604aee0275a1feeb216598179f445ac79445/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto#L40
   
   I added a comment about this, thanks for the question.



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in
+      all_primitives + \
+      basic_array_types + \
+      basic_map_types]
+                        )
+  def test_typing_survives_proto_roundtrip(self, typ):
+    self.assertEqual(
+        typ,
+        typing_from_runner_api(
+            typing_to_runner_api(typ, schema_registry=SchemaTypeRegistry()),
+            schema_registry=SchemaTypeRegistry()))
 
-    test_cases = all_primitives + \
-                 basic_array_types + \
-                 basic_map_types
+  @parameterized.expand([(AllPrimitives, ), (ComplexSchema, )])

Review Comment:
   `parameterize` requires tuples (or dictionaries) of arguments that will be passed to the test function. Even for a single function.



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None
+
+  @staticmethod
+  def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+    if match_is_named_tuple(user_type):
+      fields = [(name, user_type.__annotations__[name])
+                for name in user_type._fields]
+
+      return RowTypeConstraint(fields=fields, user_type=user_type)
+
+    return None
+
+  @staticmethod
+  def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':

Review Comment:
   Do you mean the string `'RowTypeConstraint'` rather than an actual `RowTypeConstraint` typehint?
   
   See [PEP 563](https://peps.python.org/pep-0563/) for details on that, as well as the fix for it in Python 3.7 and above.
   
   Speaking of which, I went ahead and updated this file with `from __future__ import annotations` so we can use `RowTypeConstraint` here.
   
   Please let me know if that's not what you were getting at.



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -41,6 +43,52 @@
 from apache_beam.typehints.schemas import typing_to_runner_api
 from apache_beam.utils.timestamp import Timestamp
 
+all_nonoptional_primitives = [
+    np.int8,
+    np.int16,
+    np.int32,
+    np.int64,
+    np.float32,
+    np.float64,
+    bool,
+    bytes,
+    str,
+]
+
+all_optional_primitives = [Optional[typ] for typ in all_nonoptional_primitives]
+
+all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+basic_map_types = [
+    Mapping[key_type, value_type] for key_type,
+    value_type in itertools.product(all_primitives, all_primitives)
+]
+
+
+class AllPrimitives(NamedTuple):
+  field_int8: np.int8
+  field_int16: np.int16
+  field_int32: np.int32
+  field_int64: np.int64
+  field_float32: np.float32
+  field_float64: np.float64
+  field_bool: bool
+  field_bytes: bytes
+  field_str: str
+  field_optional_bool: Optional[bool]
+  field_optional_int32: Optional[np.int32]

Review Comment:
   yeah, might as well. Done.



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -83,6 +83,7 @@
 from apache_beam.utils.timestamp import Timestamp
 
 PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1"
+PYTHON_USER_TYPE_OPTION_URN = "beam:python:user_type:v1"

Review Comment:
   Whoops, this is part of #22082 and shouldn't be here, thanks! Removed.



##########
sdks/python/apache_beam/typehints/trivial_inference.py:
##########
@@ -438,8 +438,11 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
           from apache_beam.pvalue import Row
           if state.stack[-pop_count].value == Row:
             fields = state.stack[-1].value
-            return_type = row_type.RowTypeConstraint(
-                zip(fields, Const.unwrap_all(state.stack[-pop_count + 1:-1])))
+            return_type = row_type.RowTypeConstraint.from_fields(
+                list(
+                    zip(
+                        fields,
+                        Const.unwrap_all(state.stack[-pop_count + 1:-1]))))

Review Comment:
   Yeah unfortunately all of this Python bytecode inference logic is pretty opaque. I'm only touching it here to adjust the call to `RowTypeConstraint` for my changes (change to use static method `from_fields`, make sure argument is a list, not an iterable).
   
   Is there something specific I can do to improve clarity here?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
     result = self.by_id.get(unique_id, None)
     return result[1] if result is not None else None
 
+  def get_id_by_typing(self, typing):

Review Comment:
   Same here, this is part of #22082. Thanks for calling these out.



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
   are cached by ID, so performing just one of them wouldn't necessarily exercise
   all code paths.
   """
-  def test_typing_survives_proto_roundtrip(self):
-    all_nonoptional_primitives = [
-        np.int8,
-        np.int16,
-        np.int32,
-        np.int64,
-        np.float32,
-        np.float64,
-        bool,
-        bytes,
-        str,
-    ]
-
-    all_optional_primitives = [
-        Optional[typ] for typ in all_nonoptional_primitives
-    ]
-
-    all_primitives = all_nonoptional_primitives + all_optional_primitives
-
-    basic_array_types = [Sequence[typ] for typ in all_primitives]
-
-    basic_map_types = [
-        Mapping[key_type, value_type] for key_type,
-        value_type in itertools.product(all_primitives, all_primitives)
-    ]
-
-    selected_schemas = [
-        NamedTuple(
-            'AllPrimitives',
-            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
-        NamedTuple(
-            'ComplexSchema',
-            [
-                ('id', np.int64),
-                ('name', str),
-                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
-                ('optional_array', Optional[Sequence[np.float32]]),
-                ('array_optional', Sequence[Optional[bool]]),
-                ('timestamp', Timestamp),
-            ])
-    ]
+  @parameterized.expand([(typ,) for typ in

Review Comment:
   Just to avoid the reserved word `type`. I changed this to `user_type` for clarity.



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Thanks, that does at least allow dropping the `hasattr` call



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Sure, `SchemaTranslation` just tracks the registry that is used for caching schemas by ID. The intention is that we can specify a registry to use at the entrypoint (the free function `typing_from_runner_api`), then drop into the implementation which keeps track of that context.
   
   These are just places that I missed when moving the logic over to use `SchemaTranslation` in #17108. The intention was for all recursive calls to use self, and only have the free functions as the input.



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
     result = self.by_id.get(unique_id, None)
     return result[1] if result is not None else None
 
+  def get_id_by_typing(self, typing):
+    result = self.by_typing.get(typing, None)

Review Comment:
   This is obsolete, I didn't intend to keep this code.



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)

Review Comment:
   This pattern is pretty common for recursive schema-handling logic in both Python and Java. See for example: https://github.com/apache/beam/blob/07ed486d653df440b7993679bc6226e0dc4dd6dc/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L323-L337
   
   I'm open to suggestions on how to refactor, but I'd also prefer to leave them for a separate PR to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1171531775

   Run PythonLint PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1167721045

   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1170607816

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1169189048

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1169383711

   @yeandy I discussed this offline with @robertwb and he is OK with the high-level approach. Would you have time to do the actual code review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryanthompson591 commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
ryanthompson591 commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910155133


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Also, are schema ids something that should ever be none? I see below that you have a schema registry.  Would it make sense to register the new ID right away? Or perhaps add a comment here: something like, schema ids are none when not registered, these ID's will be filled in in schemas.py?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)

Review Comment:
   when I see stuff like a return in the middle of a long method I start to think this is an example of a method that should be broken down into smaller named methods.



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
     result = self.by_id.get(unique_id, None)
     return result[1] if result is not None else None
 
+  def get_id_by_typing(self, typing):
+    result = self.by_typing.get(typing, None)

Review Comment:
   not sure if it will hurt line length, but I like
   id_schema_tuple as a variable name instead of result.
   
   Or:
   id, _ = self.by_typing.get(typing, (None, None))
   return id



##########
sdks/python/apache_beam/typehints/trivial_inference.py:
##########
@@ -438,8 +438,11 @@ def infer_return_type_func(f, input_types, debug=False, depth=0):
           from apache_beam.pvalue import Row
           if state.stack[-pop_count].value == Row:
             fields = state.stack[-1].value
-            return_type = row_type.RowTypeConstraint(
-                zip(fields, Const.unwrap_all(state.stack[-pop_count + 1:-1])))
+            return_type = row_type.RowTypeConstraint.from_fields(
+                list(
+                    zip(
+                        fields,
+                        Const.unwrap_all(state.stack[-pop_count + 1:-1]))))

Review Comment:
   the logic of this line was hard for me to follow.



##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint
+    if not self.element_type is RowTypeConstraint:
+      return None

Review Comment:
   can you add a comment as to why we return None here.



##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint

Review Comment:
   All other imports are at the top of the file. Why is this import inlined?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
 
 # pytype: skip-file
 
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy
+    some assumptions:
+
+    - **to:** We assume that the user type can be constructed with field values
+      in order.
+    - **from:** We assume that field values can be accessed from instances of
+      the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+    The RowTypeConstraint constructor should not be called directly (even
+    internally to Beam). Prefer static methods ``from_user_type`` or
+    ``from_fields``.
+
+    Parameters:
+      fields: a list of (name, type) tuples, representing the schema inferred
+        from user_type.
+      user_type: constructor for a user type (e.g. NamedTuple class) that is
+        used to represent this schema in user code.
+    """
+    # Recursively wrap row types in a RowTypeConstraint
+    self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+                         for name,
+                         typ in fields)
+
+    self._user_type = user_type
+    if self._user_type is not None and hasattr(self._user_type,
+                                               _BEAM_SCHEMA_ID):
+      self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+    else:
+      self._schema_id = None

Review Comment:
   Not sure if it's cleaner.
   
   getattr(obj, 'attr', lambda: None)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1167914248

   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1167776291

   Run XVR_Flink PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r912143324


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,94 @@
 
 # pytype: skip-file
 
+from __future__ import annotations
+
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy

Review Comment:
   Thanks for asking this. I originally wrote this docstring when I was adding support for dataclasses. I was trying to communicate that both dataclass instances and NamedTuple instances can be constructed (to) and consumed (from) in the same way. We assume that these will work elsewhere (e.g. in `RowCoder`), so I wanted to document the assumption here.
   
   Anyway that framing was confusing for this PR since I dropped the dataclass support, so I rephrased it. Hopefully that helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22066: WIP: Use RowTypeConstraint for normalizing schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1167881826

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22066](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c8aee04) into [master](https://codecov.io/gh/apache/beam/commit/93b38c8fc5950f813c99da8ed13749146cc508d4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (93b38c8) will **decrease** coverage by `0.01%`.
   > The diff coverage is `87.50%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22066      +/-   ##
   ==========================================
   - Coverage   74.01%   73.99%   -0.02%     
   ==========================================
     Files         703      703              
     Lines       92936    92983      +47     
   ==========================================
   + Hits        68786    68805      +19     
   - Misses      22884    22912      +28     
     Partials     1266     1266              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.57% <87.50%> (-0.04%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/pvalue.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcHZhbHVlLnB5) | `90.18% <33.33%> (-1.27%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/schemas.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3NjaGVtYXMucHk=) | `93.33% <87.87%> (-1.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/row\_type.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3Jvd190eXBlLnB5) | `95.74% <96.77%> (+6.27%)` | :arrow_up: |
   | [.../python/apache\_beam/typehints/trivial\_inference.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3RyaXZpYWxfaW5mZXJlbmNlLnB5) | `96.41% <100.00%> (ø)` | |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.01% <0.00%> (-1.39%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.33%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.57% <0.00%> (-0.57%)` | :arrow_down: |
   | ... and [4 more](https://codecov.io/gh/apache/beam/pull/22066/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [93b38c8...c8aee04](https://codecov.io/gh/apache/beam/pull/22066?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22066:
URL: https://github.com/apache/beam/pull/22066#issuecomment-1168081649

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910261330


##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -297,8 +300,12 @@ def typing_from_runner_api(
           self.typing_from_runner_api(fieldtype_proto.map_type.value_type)]
     elif type_info == "row_type":
       schema = fieldtype_proto.row_type.schema
+      # First look for user type in the registray

Review Comment:
   nit:
   ```suggestion
         # First look for user type in the registry
   ```



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -83,6 +83,7 @@
 from apache_beam.utils.timestamp import Timestamp
 
 PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1"
+PYTHON_USER_TYPE_OPTION_URN = "beam:python:user_type:v1"

Review Comment:
   I may have missed this, but where is this used?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
     result = self.by_id.get(unique_id, None)
     return result[1] if result is not None else None
 
+  def get_id_by_typing(self, typing):

Review Comment:
   Is this intended to be used? I didn't see any references



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Can you clarify the difference here by using the `self.typing_to_runner_api` vs the `typing_to_runner_api` outside of `SchemaTranslation`? similarly with `typing_from_runner_api`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910473626


##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
           self.producer.inputs)
     return self._windowing
 
+  @property
+  def schema_proto(self):
+    from apache_beam.typehints.row_type import RowTypeConstraint

Review Comment:
   I think this was just for expediency as I was hacking. Sometimes this is done to avoid circular imports, but I don't think that's the case here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit merged pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged PR #22066:
URL: https://github.com/apache/beam/pull/22066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yeandy commented on a diff in pull request #22066: Python: Use RowTypeConstraint for normalizing all schema-inferrable user types

Posted by GitBox <gi...@apache.org>.
yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r912078084


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,94 @@
 
 # pytype: skip-file
 
+from __future__ import annotations
+
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy

Review Comment:
   Can you clarify? I see the `from_user_type` function to convert from user type?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Ah, thanks, I see the distinction now. I see the `schema_registry` arg in the second pass 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org