You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/11 22:29:41 UTC

[GitHub] [beam] yeandy commented on a diff in pull request #22679: Add GeneratedClassRowTypeConstraint

yeandy commented on code in PR #22679:
URL: https://github.com/apache/beam/pull/22679#discussion_r943969185


##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -572,5 +563,53 @@ def test_schema_with_bad_field_raises_helpful_error(self):
             schema_registry=SchemaTypeRegistry()))
 
 
+@parameterized_class([
+    {
+        'pickler': pickle,
+    },
+    {
+        'pickler': dill,
+    },
+    {
+        'pickler': cloudpickle,
+    },
+])
+class PickleTest(unittest.TestCase):
+  def test_generated_class_pickle_instance(self):
+    schema = schema_pb2.Schema(
+        id="some-uuid",
+        fields=[
+            schema_pb2.Field(
+                name='name',
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+            )
+        ])
+    user_type = named_tuple_from_schema(schema)
+    instance = user_type(name="test")
+
+    self.assertEqual(instance, self.pickler.loads(self.pickler.dumps(instance)))
+
+  def test_generated_class_row_type_pickle(self):
+    row_proto = schema_pb2.FieldType(
+        row_type=schema_pb2.RowType(

Review Comment:
   Why does `schema_pb2.RowType` have to be wrapped by `schema_pb2.FieldType`?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__

Review Comment:
   > the generated user_type cannot be pickled
   
   Can you please explain why this is the case?



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -112,8 +109,19 @@ def from_user_type(
     return None
 
   @staticmethod
-  def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
-    return RowTypeConstraint(fields=fields, user_type=None)
+  def from_fields(
+      fields: Sequence[Tuple[str, type]],
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,
+  ) -> RowTypeConstraint:

Review Comment:
   ```suggestion
     ) -> GeneratedClassRowTypeConstraint:
   ```



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__
+  function that will regenerate the user_type.
+  """
+  def __init__(
+      self,
+      fields,
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,
+  ):
+    from apache_beam.typehints.schemas import named_fields_to_schema
+    from apache_beam.typehints.schemas import named_tuple_from_schema
+
+    if schema_registry is None:
+      kwargs = {}
+    else:
+      kwargs = {'schema_registry': schema_registry}
+
+    schema = named_fields_to_schema(
+        fields,
+        schema_id=schema_id,
+        schema_options=schema_options,
+        field_options=field_options,
+        **kwargs)
+    user_type = named_tuple_from_schema(schema, **kwargs)
+    setattr(user_type, _BEAM_SCHEMA_ID, schema_id)
+
+    super().__init__(
+        fields,
+        user_type,
+        schema_options=schema_options,
+        field_options=field_options)
+
+  def __reduce__(self):
+    return (
+        RowTypeConstraint.from_fields,
+        (
+            self._fields,
+            self._schema_id,
+            self._schema_options,
+            self._field_options))

Review Comment:
   Nice. Do you it would be clear to explicitly add `None` for the 5th item (`schema_registry`)? I was initially looking for the 5 args for `from_fields`, but only saw 4 😄 . I'm perfectly ok with the current implementation though.



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__
+  function that will regenerate the user_type.
+  """
+  def __init__(
+      self,
+      fields,
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,

Review Comment:
   ```suggestion
         schema_registry: Optional[SchemaTypeRegistry] = None,
   ```



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -112,8 +109,19 @@ def from_user_type(
     return None
 
   @staticmethod
-  def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
-    return RowTypeConstraint(fields=fields, user_type=None)
+  def from_fields(
+      fields: Sequence[Tuple[str, type]],
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,

Review Comment:
   Would this fix this lint issue
   ```
   18:16:14 Running flake8...
   18:16:24 apache_beam/typehints/row_type.py:116:24: F821 undefined name 'SchemaTypeRegistry'
   18:16:24       schema_registry: SchemaTypeRegistry = None,
   18:16:24                        ^
   18:16:24 apache_beam/typehints/row_type.py:183:24: F821 undefined name 'SchemaTypeRegistry'
   18:16:24       schema_registry: SchemaTypeRegistry = None,
   18:16:24                        ^
   18:16:24 2     F821 undefined name 'SchemaTypeRegistry'
   ```
   
   ```suggestion
         schema_registry: Optional[SchemaTypeRegistry] = None,
   ```



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom __reduce__
+  function that will regenerate the user_type.
+  """
+  def __init__(
+      self,
+      fields,
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,
+  ):
+    from apache_beam.typehints.schemas import named_fields_to_schema
+    from apache_beam.typehints.schemas import named_tuple_from_schema
+
+    if schema_registry is None:
+      kwargs = {}
+    else:
+      kwargs = {'schema_registry': schema_registry}

Review Comment:
   ```suggestion
       kwargs = {'schema_registry': schema_registry} if schema_registry else {}
   ```



##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -572,5 +563,53 @@ def test_schema_with_bad_field_raises_helpful_error(self):
             schema_registry=SchemaTypeRegistry()))
 
 
+@parameterized_class([
+    {
+        'pickler': pickle,
+    },
+    {
+        'pickler': dill,
+    },
+    {
+        'pickler': cloudpickle,
+    },
+])
+class PickleTest(unittest.TestCase):
+  def test_generated_class_pickle_instance(self):
+    schema = schema_pb2.Schema(
+        id="some-uuid",
+        fields=[
+            schema_pb2.Field(
+                name='name',
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+            )
+        ])
+    user_type = named_tuple_from_schema(schema)
+    instance = user_type(name="test")
+
+    self.assertEqual(instance, self.pickler.loads(self.pickler.dumps(instance)))
+
+  def test_generated_class_row_type_pickle(self):
+    row_proto = schema_pb2.FieldType(
+        row_type=schema_pb2.RowType(

Review Comment:
   Looked again, looks like top level of protos are `schema_pb2.FieldType` I believe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org