You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2021/11/03 00:19:37 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #15472: Add cloudpickle as optional library

tvalentyn commented on a change in pull request #15472:

File path: sdks/python/apache_beam/coders/
@@ -680,14 +682,14 @@ def iterable_state_read(token, element_coder_impl):
     context = pipeline_context.PipelineContext(
-    self.check_coder(
-        coder, [1, 2, 3], context=context, test_size_estimation=False)
+    # Note: do not use check_coder see

Review comment:
       Can you think of a pipeline where this will be a problem?

File path: sdks/python/apache_beam/internal/
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Pickler for values, functions, and classes.
+For internal use only. No backwards compatibility guarantees.
+Uses the cloudpickle library to pickle data, functions, lambdas
+and classes.
+dump_session and load_session are no ops.
+# pytype: skip-file
+import base64
+import bz2
+import io
+import logging
+import sys
+import threading
+import traceback
+import types
+import zlib
+from typing import Any
+from typing import Dict
+from typing import Tuple
+from _thread import RLock as RLockType
+  from absl import flags
+except (ImportError, ModuleNotFoundError):
+  pass
+import cloudpickle
+# Pickling, especially unpickling, causes broken module imports on Python 3
+# if executed concurrently, see: BEAM-8651,
+_pickle_lock = threading.RLock()
+import __main__ as _main_module
+def dumps(o, enable_trace=True, use_zlib=False):
+  # type: (...) -> bytes
+  """For internal use only; no backwards-compatibility guarantees."""
+  with _pickle_lock:
+    with io.BytesIO() as file:
+      pickler = cloudpickle.CloudPickler(file)

Review comment:
       It may be possible to "unpatch" dill changes to dispatch table at this point. In internal codebase I came across 
   def _unpatch_dill():
       import dill
   which may help prevent interference if dill somehow gets pulled in by Beam or other dependency.

File path: sdks/python/apache_beam/internal/
@@ -28,286 +28,46 @@
 the coders.*PickleCoder classes should be used instead.
-# pytype: skip-file
+from apache_beam.internal import cloudpickle_pickler
+from apache_beam.internal import dill_pickler
-import base64
-import bz2
-import logging
-import sys
-import threading
-import traceback
-import types
-import zlib
-from typing import Any
-from typing import Dict
-from typing import Tuple
+pickler_lib = dill_pickler
-import dill
-settings = {'dill_byref': None}
-class _NoOpContextManager(object):
-  def __enter__(self):
-    pass
-  def __exit__(self, *unused_exc_info):
-    pass
-# Pickling, especially unpickling, causes broken module imports on Python 3
-# if executed concurrently, see: BEAM-8651,
-_pickle_lock = threading.RLock()
-# Dill 0.28.0 renamed dill.dill to dill._dill:
-# TODO: Remove this once Beam depends on dill >= 0.2.8
-if not getattr(dill, 'dill', None):
-  dill.dill = dill._dill
-  sys.modules['dill.dill'] = dill._dill
-# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8
-if not getattr(dill, '_dill', None):
-  dill._dill = dill.dill
-  sys.modules['dill._dill'] = dill.dill
-def _is_nested_class(cls):
-  """Returns true if argument is a class object that appears to be nested."""
-  return (
-      isinstance(cls, type) and cls.__module__ is not None and
-      cls.__module__ != 'builtins' and
-      cls.__name__ not in sys.modules[cls.__module__].__dict__)
-def _find_containing_class(nested_class):
-  """Finds containing class of a nested class passed as argument."""
-  seen = set()
-  def _find_containing_class_inner(outer):
-    if outer in seen:
-      return None
-    seen.add(outer)
-    for k, v in outer.__dict__.items():
-      if v is nested_class:
-        return outer, k
-      elif isinstance(v, type) and hasattr(v, '__dict__'):
-        res = _find_containing_class_inner(v)
-        if res: return res
-  return _find_containing_class_inner(sys.modules[nested_class.__module__])
-def _nested_type_wrapper(fun):
-  """A wrapper for the standard pickler handler for class objects.
-  Args:
-    fun: Original pickler handler for type objects.
-  Returns:
-    A wrapper for type objects that handles nested classes.
-  The wrapper detects if an object being pickled is a nested class object.
-  For nested class object only it will save the containing class object so
-  the nested structure is recreated during unpickle.
-  """
-  def wrapper(pickler, obj):
-    # When the nested class is defined in the __main__ module we do not have to
-    # do anything special because the pickler itself will save the constituent
-    # parts of the type (i.e., name, base classes, dictionary) and then
-    # recreate it during unpickling.
-    if _is_nested_class(obj) and obj.__module__ != '__main__':
-      containing_class_and_name = _find_containing_class(obj)
-      if containing_class_and_name is not None:
-        return pickler.save_reduce(getattr, containing_class_and_name, obj=obj)
-    try:
-      return fun(pickler, obj)
-    except dill.dill.PicklingError:
-      # pylint: disable=protected-access
-      return pickler.save_reduce(
-          dill.dill._create_type,
-          (
-              type(obj),
-              obj.__name__,
-              obj.__bases__,
-              dill.dill._dict_from_dictproxy(obj.__dict__)),
-          obj=obj)
-      # pylint: enable=protected-access
-  return wrapper
-# Monkey patch the standard pickler dispatch table entry for type objects.
-# Dill, for certain types, defers to the standard pickler (including type
-# objects). We wrap the standard handler using type_wrapper() because
-# for nested class we want to pickle the actual enclosing class object so we
-# can recreate it during unpickling.
-# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project.
-dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
-    dill.dill.Pickler.dispatch[type])
-# Dill pickles generators objects without complaint, but unpickling produces
-# TypeError: object.__new__(generator) is not safe, use generator.__new__()
-# on some versions of Python.
-def _reject_generators(unused_pickler, unused_obj):
-  raise TypeError("can't (safely) pickle generator objects")
-dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators
-# This if guards against dill not being full initialized when generating docs.
-if 'save_module' in dir(dill.dill):
-  # Always pickle non-main modules by name.
-  old_save_module = dill.dill.save_module
-  @dill.dill.register(dill.dill.ModuleType)
-  def save_module(pickler, obj):
-    if dill.dill.is_dill(pickler) and obj is pickler._main:
-      return old_save_module(pickler, obj)
-    else:
-'M2: %s' % obj)
-      # pylint: disable=protected-access
-      pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj)
-      # pylint: enable=protected-access
-'# M2')
-  # Pickle module dictionaries (commonly found in lambda's globals)
-  # by referencing their module.
-  old_save_module_dict = dill.dill.save_module_dict
-  known_module_dicts = {
-  }  # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]]
-  @dill.dill.register(dict)
-  def new_save_module_dict(pickler, obj):
-    obj_id = id(obj)
-    if not known_module_dicts or '__file__' in obj or '__package__' in obj:
-      if obj_id not in known_module_dicts:
-        # Trigger loading of lazily loaded modules (such as pytest vendored
-        # modules).
-        # This pass over sys.modules needs to iterate on a copy of sys.modules
-        # since lazy loading modifies the dictionary, hence the use of list().
-        for m in list(sys.modules.values()):
-          try:
-            _ = m.__dict__
-          except AttributeError:
-            pass
-        for m in list(sys.modules.values()):
-          try:
-            if (m and m.__name__ != '__main__' and
-                isinstance(m, dill.dill.ModuleType)):
-              d = m.__dict__
-              known_module_dicts[id(d)] = m, d
-          except AttributeError:
-            # Skip modules that do not have the __name__ attribute.
-            pass
-    if obj_id in known_module_dicts and dill.dill.is_dill(pickler):
-      m = known_module_dicts[obj_id][0]
-      try:
-        # pylint: disable=protected-access
-        dill.dill._import_module(m.__name__)
-        return pickler.save_reduce(
-            getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj)
-      except (ImportError, AttributeError):
-        return old_save_module_dict(pickler, obj)
-    else:
-      return old_save_module_dict(pickler, obj)
-  dill.dill.save_module_dict = new_save_module_dict
-  def _nest_dill_logging():
-    """Prefix all dill logging with its depth in the callstack.
-    Useful for debugging pickling of deeply nested structures.
-    """
-    old_log_info =
-    def new_log_info(msg, *args, **kwargs):
-      old_log_info(
-          ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,
-          *args,
-          **kwargs)
- = new_log_info
-# Turn off verbose logging from the dill pickler.
 def dumps(o, enable_trace=True, use_zlib=False):
   # type: (...) -> bytes
-  """For internal use only; no backwards-compatibility guarantees."""
-  with _pickle_lock:
-    try:
-      s = dill.dumps(o, byref=settings['dill_byref'])
-    except Exception:  # pylint: disable=broad-except
-      if enable_trace:
-        dill.dill._trace(True)  # pylint: disable=protected-access
-        s = dill.dumps(o, byref=settings['dill_byref'])
-      else:
-        raise
-    finally:
-      dill.dill._trace(False)  # pylint: disable=protected-access
-  # Compress as compactly as possible (compresslevel=9) to decrease peak memory
-  # usage (of multiple in-memory copies) and to avoid hitting protocol buffer
-  # limits.
-  # WARNING: Be cautious about compressor change since it can lead to pipeline
-  # representation change, and can break streaming job update compatibility on
-  # runners such as Dataflow.
-  if use_zlib:
-    c = zlib.compress(s, 9)
-  else:
-    c = bz2.compress(s, compresslevel=9)
-  del s  # Free up some possibly large and no-longer-needed memory.
-  return base64.b64encode(c)
+  return pickler_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib)
 def loads(encoded, enable_trace=True, use_zlib=False):
   """For internal use only; no backwards-compatibility guarantees."""
-  c = base64.b64decode(encoded)
-  if use_zlib:
-    s = zlib.decompress(c)
-  else:
-    s = bz2.decompress(c)
-  del c  # Free up some possibly large and no-longer-needed memory.
-  with _pickle_lock:
-    try:
-      return dill.loads(s)
-    except Exception:  # pylint: disable=broad-except
-      if enable_trace:
-        dill.dill._trace(True)  # pylint: disable=protected-access
-        return dill.loads(s)
-      else:
-        raise
-    finally:
-      dill.dill._trace(False)  # pylint: disable=protected-access
+  return pickler_lib.loads(encoded, enable_trace=enable_trace,
+                           use_zlib=use_zlib)
 def dump_session(file_path):
   """For internal use only; no backwards-compatibility guarantees.
   Pickle the current python session to be used in the worker.
-  Note: Due to the inconsistency in the first dump of dill dump_session we
-  create and load the dump twice to have consistent results in the worker and
-  the running session. Check:
-  with _pickle_lock:
-    dill.dump_session(file_path)
-    dill.load_session(file_path)
-    return dill.dump_session(file_path)
+  return pickler_lib.dump_session(file_path)
 def load_session(file_path):
-  with _pickle_lock:
-    return dill.load_session(file_path)
+  return pickler_lib.load_session(file_path)
+def change_pickle_lib(pickle_lib):
+  """ Changes pickling library. Users should prefer the default library."""
+  global pickler_lib

Review comment:
       1. how about we use  `desired_pickler` instead of `pickle_lib` to avoid possible confusion due to a typo?
   2. It seems like the condition below should be inverted. 
   3. Let's change the function to: `set_pickle_lib` to avoid confusion with a usage that inverts a current value without taking desired value into account.

File path: sdks/python/apache_beam/internal/
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for the cloudpickle_pickler module."""
+# pytype: skip-file
+import sys
+import types
+import unittest
+from apache_beam.internal import module_test
+from apache_beam.internal.cloudpickle_pickler import dumps
+from apache_beam.internal.cloudpickle_pickler import loads
+class PicklerTest(unittest.TestCase):
+  NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
+  def test_basics(self):
+    self.assertEqual([1, 'a', (u'z', )], loads(dumps([1, 'a', (u'z', )])))
+    fun = lambda x: 'xyz-%s' % x
+    self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))
+  def test_lambda_with_globals(self):
+    """Tests that the globals of a function are preserved."""
+    # The point of the test is that the lambda being called after unpickling
+    # relies on having the re module being loaded.
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(
+                         module_test.get_lambda_with_globals()))('abc def'))
+  def test_lambda_with_main_globals(self):
+    self.assertEqual(unittest, loads(dumps(lambda: unittest))())
+  def test_lambda_with_closure(self):
+    """Tests that the closure of a function is preserved."""
+    self.assertEqual(
+        'closure: abc',
+        loads(dumps(module_test.get_lambda_with_closure('abc')))())
+  def test_class(self):
+    """Tests that a class object is pickled correctly."""
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(module_test.Xyz))().foo('abc def'))
+  def test_object(self):
+    """Tests that a class instance is pickled correctly."""
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
+  def test_nested_class(self):
+    """Tests that a nested class object is pickled correctly."""
+    self.assertEqual(
+        'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
+    self.assertEqual(
+        'Y:abc',
+        loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
+  def test_dynamic_class(self):
+    """Tests that a nested class object is pickled correctly."""
+    self.assertEqual(
+        'Z:abc', loads(dumps(module_test.create_class('abc'))).get())
+  def test_generators(self):
+    with self.assertRaises(TypeError):
+      dumps((_ for _ in range(10)))
+  def test_recursive_class(self):
+    self.assertEqual(
+        'RecursiveClass:abc',
+        loads(dumps(module_test.RecursiveClass('abc').datum)))
+  def test_function_with_external_reference(self):
+    out_of_scope_var = 'expected_value'
+    def foo():
+      return out_of_scope_var
+    self.assertEqual('expected_value', loads(dumps(foo))())
+  @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')

Review comment:
       When would this be skipped?

File path: sdks/python/apache_beam/internal/
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for the cloudpickle_pickler module."""
+# pytype: skip-file
+import sys
+import types
+import unittest
+from apache_beam.internal import module_test
+from apache_beam.internal.cloudpickle_pickler import dumps
+from apache_beam.internal.cloudpickle_pickler import loads
+class PicklerTest(unittest.TestCase):
+  NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
+  def test_basics(self):
+    self.assertEqual([1, 'a', (u'z', )], loads(dumps([1, 'a', (u'z', )])))
+    fun = lambda x: 'xyz-%s' % x
+    self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))
+  def test_lambda_with_globals(self):
+    """Tests that the globals of a function are preserved."""
+    # The point of the test is that the lambda being called after unpickling
+    # relies on having the re module being loaded.
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(
+                         module_test.get_lambda_with_globals()))('abc def'))
+  def test_lambda_with_main_globals(self):
+    self.assertEqual(unittest, loads(dumps(lambda: unittest))())
+  def test_lambda_with_closure(self):
+    """Tests that the closure of a function is preserved."""
+    self.assertEqual(
+        'closure: abc',
+        loads(dumps(module_test.get_lambda_with_closure('abc')))())
+  def test_class(self):
+    """Tests that a class object is pickled correctly."""
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(module_test.Xyz))().foo('abc def'))
+  def test_object(self):
+    """Tests that a class instance is pickled correctly."""
+    self.assertEqual(['abc', 'def'],
+                     loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
+  def test_nested_class(self):
+    """Tests that a nested class object is pickled correctly."""
+    self.assertEqual(
+        'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
+    self.assertEqual(
+        'Y:abc',
+        loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
+  def test_dynamic_class(self):
+    """Tests that a nested class object is pickled correctly."""

Review comment:
       nit: duplicated comment? 
   You could also name the test so that it captures the behavior; then the comment becomes unnecessary. E.g. smth like:

File path: sdks/python/apache_beam/internal/
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Pickler for values, functions, and classes.
+For internal use only. No backwards compatibility guarantees.
+Uses the cloudpickle library to pickle data, functions, lambdas
+and classes.
+dump_session and load_session are no ops.
+# pytype: skip-file
+import base64
+import bz2
+import io
+import logging
+import sys
+import threading
+import traceback
+import types
+import zlib
+from typing import Any
+from typing import Dict
+from typing import Tuple
+from _thread import RLock as RLockType
+  from absl import flags
+except (ImportError, ModuleNotFoundError):
+  pass
+import cloudpickle
+# Pickling, especially unpickling, causes broken module imports on Python 3
+# if executed concurrently, see: BEAM-8651,
+_pickle_lock = threading.RLock()
+import __main__ as _main_module
+def dumps(o, enable_trace=True, use_zlib=False):
+  # type: (...) -> bytes
+  """For internal use only; no backwards-compatibility guarantees."""
+  with _pickle_lock:
+    with io.BytesIO() as file:
+      pickler = cloudpickle.CloudPickler(file)
+      pickler.dispatch_table[RLockType] = _pickle_rlock
+      try:
+        pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
+      except NameError:
+        pass
+      pickler.dump(o)
+      s = file.getvalue()
+      # TODO(ryanthompson): See if echoing dill.enable_trace is useful.
+  # Compress as compactly as possible (compresslevel=9) to decrease peak memory
+  # usage (of multiple in-memory copies) and to avoid hitting protocol buffer
+  # limits.
+  # WARNING: Be cautious about compressor change since it can lead to pipeline
+  # representation change, and can break streaming job update compatibility on
+  # runners such as Dataflow.
+  if use_zlib:
+    c = zlib.compress(s, 9)
+  else:
+    c = bz2.compress(s, compresslevel=9)
+  del s  # Free up some possibly large and no-longer-needed memory.
+  return base64.b64encode(c)
+def loads(encoded, enable_trace=True, use_zlib=False):
+  """For internal use only; no backwards-compatibility guarantees."""
+  c = base64.b64decode(encoded)
+  if use_zlib:
+    s = zlib.decompress(c)
+  else:
+    s = bz2.decompress(c)
+  del c  # Free up some possibly large and no-longer-needed memory.
+  with _pickle_lock:
+    unpickled = cloudpickle.loads(s)
+    return unpickled
+def _pickle_rlock(obj):
+  return _create_rlock, tuple([])
+def _create_rlock():
+  return RLockType()

Review comment:
       Is this the same what dill currently does?

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

For queries about this service, please contact Infrastructure at: