You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2022/10/21 21:19:44 UTC

[beam] branch master updated: Avoid pickling unstable reference to moved proto classes. (#23739)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd87491dcb Avoid pickling unstable reference to moved proto classes. (#23739)
8dd87491dcb is described below

commit 8dd87491dcb2660f859f87b3336adb984c62c12d
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Fri Oct 21 14:19:32 2022 -0700

    Avoid pickling unstable reference to moved proto classes. (#23739)
    
    CloudPickle notices that schema_pb2 (from the closure of the locally
    defined __reduce__ method) is not importable under its declared name
    (org.apache.beam...) and tries to pickle it (which fails due to proto
    classes themselves being unpicklable).  This avoids that error by
    moving it out.
---
 sdks/python/apache_beam/typehints/schemas.py | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py
index 4371ca2de0d..9f57700cccd 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -463,12 +463,10 @@ class SchemaTranslation(object):
 
     # Define a reduce function, otherwise these types can't be pickled
     # (See BEAM-9574)
-    def __reduce__(self):
-      return (
-          _hydrate_namedtuple_instance,
-          (schema.SerializeToString(), tuple(self)))
-
-    setattr(user_type, '__reduce__', __reduce__)
+    setattr(
+        user_type,
+        '__reduce__',
+        _named_tuple_reduce_method(schema.SerializeToString()))
 
     self.schema_registry.add(user_type, schema)
     coders.registry.register_coder(user_type, coders.RowCoder)
@@ -476,6 +474,13 @@ class SchemaTranslation(object):
     return user_type
 
 
+def _named_tuple_reduce_method(serialized_schema):
+  def __reduce__(self):
+    return _hydrate_namedtuple_instance, (serialized_schema, tuple(self))
+
+  return __reduce__
+
+
 def _hydrate_namedtuple_instance(encoded_schema, values):
   return named_tuple_from_schema(
       proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)