You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/12 17:04:12 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22679: Add GeneratedClassRowTypeConstraint

TheNeuralBit commented on code in PR #22679:
URL: https://github.com/apache/beam/pull/22679#discussion_r944628110


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -112,8 +109,19 @@ def from_user_type(
     return None
 
   @staticmethod
-  def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
-    return RowTypeConstraint(fields=fields, user_type=None)
+  def from_fields(
+      fields: Sequence[Tuple[str, type]],
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,

Review Comment:
   Oh I think that error may have showed up on a previous iteration, when I had accidentally left out the import.
   
   It's surprising that CI isn't complaining about the lack of Optional here though...



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__
+  function that will regenerate the user_type.
+  """
+  def __init__(
+      self,
+      fields,
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,
+  ):
+    from apache_beam.typehints.schemas import named_fields_to_schema
+    from apache_beam.typehints.schemas import named_tuple_from_schema
+
+    if schema_registry is None:
+      kwargs = {}
+    else:
+      kwargs = {'schema_registry': schema_registry}
+
+    schema = named_fields_to_schema(
+        fields,
+        schema_id=schema_id,
+        schema_options=schema_options,
+        field_options=field_options,
+        **kwargs)
+    user_type = named_tuple_from_schema(schema, **kwargs)
+    setattr(user_type, _BEAM_SCHEMA_ID, schema_id)
+
+    super().__init__(
+        fields,
+        user_type,
+        schema_options=schema_options,
+        field_options=field_options)
+
+  def __reduce__(self):
+    return (
+        RowTypeConstraint.from_fields,
+        (
+            self._fields,
+            self._schema_id,
+            self._schema_options,
+            self._field_options))

Review Comment:
   Good idea, thanks



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__

Review Comment:
   I actually don't fully understand it, but it's been a consistent issue with the Schema code. Each pickle library (built-in, dill, cloudpickle) fails for a different reason. I filed https://github.com/apache/beam/issues/22714 to track this, and added a (skipped) test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org