You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/17 06:41:34 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #15472: [BEAM-8123] Add cloudpickle as optional library

tvalentyn commented on a change in pull request #15472:
URL: https://github.com/apache/beam/pull/15472#discussion_r750907099



##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
##########
@@ -76,6 +77,8 @@ def create_harness(environment, dry_run=False):
   RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
   sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
   filesystems.FileSystems.set_options(sdk_pipeline_options)
+  pickler.set_pipeline_options(

Review comment:
       i'd rename this as this sounds as if we were setting pipeline options. Alternatives to consider:
   - call this method `pickler.set_options` (to mean that we set picker's options; in which case the corresponding pipeline option could be called `--pickle_options`, sample value `--pickle_options=use_dill`)
   - `pickler.set_pickle_library(pickle_libary=sdk_pipeline_options.view_as(SetupOptions).pickle_library)` (my preference.)
   - `pickler.configure_from_options(pickle_options=sdk_pipeline_options.view_as(SetupOptions).pickle_options))`
   
   

##########
File path: sdks/python/container/base_image_requirements.txt
##########
@@ -26,6 +26,7 @@
 # TODO(AVRO-2429): Upgrade to >= 1.9.0 only after resolved
 avro-python3==1.8.2
 fastavro==1.0.0.post1
+cloudpickle==2.0.0

Review comment:
       You can revert changes to this file now that it's been deleted.

##########
File path: sdks/python/apache_beam/internal/cloudpickle_pickler.py
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pickler for values, functions, and classes.
+
+For internal use only. No backwards compatibility guarantees.
+
+Uses the cloudpickle library to pickle data, functions, lambdas
+and classes.
+
+dump_session and load_session are no ops.

Review comment:
       ```suggestion
   dump_session and load_session are no-ops.
   ```

##########
File path: sdks/python/apache_beam/internal/pickler.py
##########
@@ -28,286 +28,65 @@
 the coders.*PickleCoder classes should be used instead.
 """
 
-# pytype: skip-file
+from apache_beam.internal import cloudpickle_pickler
+from apache_beam.internal import dill_pickler
 
-import base64
-import bz2
-import logging
-import sys
-import threading
-import traceback
-import types
-import zlib
-from typing import Any
-from typing import Dict
-from typing import Tuple
+USE_CLOUDPICKLE = 1
+USE_DILL = 2
+DEFAULT_PICKLE_LIB = USE_DILL
 
-import dill
-
-settings = {'dill_byref': None}
-
-
-class _NoOpContextManager(object):
-  def __enter__(self):
-    pass
-
-  def __exit__(self, *unused_exc_info):
-    pass
-
-
-# Pickling, especially unpickling, causes broken module imports on Python 3
-# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884.
-_pickle_lock = threading.RLock()
-# Dill 0.28.0 renamed dill.dill to dill._dill:
-# https://github.com/uqfoundation/dill/commit/f0972ecc7a41d0b8acada6042d557068cac69baa
-# TODO: Remove this once Beam depends on dill >= 0.2.8
-if not getattr(dill, 'dill', None):
-  dill.dill = dill._dill
-  sys.modules['dill.dill'] = dill._dill
-
-# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8
-if not getattr(dill, '_dill', None):
-  dill._dill = dill.dill
-  sys.modules['dill._dill'] = dill.dill
-
-
-def _is_nested_class(cls):
-  """Returns true if argument is a class object that appears to be nested."""
-  return (
-      isinstance(cls, type) and cls.__module__ is not None and
-      cls.__module__ != 'builtins' and
-      cls.__name__ not in sys.modules[cls.__module__].__dict__)
-
-
-def _find_containing_class(nested_class):
-  """Finds containing class of a nested class passed as argument."""
-
-  seen = set()
-
-  def _find_containing_class_inner(outer):
-    if outer in seen:
-      return None
-    seen.add(outer)
-    for k, v in outer.__dict__.items():
-      if v is nested_class:
-        return outer, k
-      elif isinstance(v, type) and hasattr(v, '__dict__'):
-        res = _find_containing_class_inner(v)
-        if res: return res
-
-  return _find_containing_class_inner(sys.modules[nested_class.__module__])
-
-
-def _nested_type_wrapper(fun):
-  """A wrapper for the standard pickler handler for class objects.
-
-  Args:
-    fun: Original pickler handler for type objects.
-
-  Returns:
-    A wrapper for type objects that handles nested classes.
-
-  The wrapper detects if an object being pickled is a nested class object.
-  For nested class object only it will save the containing class object so
-  the nested structure is recreated during unpickle.
-  """
-  def wrapper(pickler, obj):
-    # When the nested class is defined in the __main__ module we do not have to
-    # do anything special because the pickler itself will save the constituent
-    # parts of the type (i.e., name, base classes, dictionary) and then
-    # recreate it during unpickling.
-    if _is_nested_class(obj) and obj.__module__ != '__main__':
-      containing_class_and_name = _find_containing_class(obj)
-      if containing_class_and_name is not None:
-        return pickler.save_reduce(getattr, containing_class_and_name, obj=obj)
-    try:
-      return fun(pickler, obj)
-    except dill.dill.PicklingError:
-      # pylint: disable=protected-access
-      return pickler.save_reduce(
-          dill.dill._create_type,
-          (
-              type(obj),
-              obj.__name__,
-              obj.__bases__,
-              dill.dill._dict_from_dictproxy(obj.__dict__)),
-          obj=obj)
-      # pylint: enable=protected-access
-
-  return wrapper
-
-
-# Monkey patch the standard pickler dispatch table entry for type objects.
-# Dill, for certain types, defers to the standard pickler (including type
-# objects). We wrap the standard handler using type_wrapper() because
-# for nested class we want to pickle the actual enclosing class object so we
-# can recreate it during unpickling.
-# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project.
-dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
-    dill.dill.Pickler.dispatch[type])
-
-
-# Dill pickles generators objects without complaint, but unpickling produces
-# TypeError: object.__new__(generator) is not safe, use generator.__new__()
-# on some versions of Python.
-def _reject_generators(unused_pickler, unused_obj):
-  raise TypeError("can't (safely) pickle generator objects")
-
-
-dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators
-
-# This if guards against dill not being full initialized when generating docs.
-if 'save_module' in dir(dill.dill):
-
-  # Always pickle non-main modules by name.
-  old_save_module = dill.dill.save_module
-
-  @dill.dill.register(dill.dill.ModuleType)
-  def save_module(pickler, obj):
-    if dill.dill.is_dill(pickler) and obj is pickler._main:
-      return old_save_module(pickler, obj)
-    else:
-      dill.dill.log.info('M2: %s' % obj)
-      # pylint: disable=protected-access
-      pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj)
-      # pylint: enable=protected-access
-      dill.dill.log.info('# M2')
-
-  # Pickle module dictionaries (commonly found in lambda's globals)
-  # by referencing their module.
-  old_save_module_dict = dill.dill.save_module_dict
-  known_module_dicts = {
-  }  # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]]
-
-  @dill.dill.register(dict)
-  def new_save_module_dict(pickler, obj):
-    obj_id = id(obj)
-    if not known_module_dicts or '__file__' in obj or '__package__' in obj:
-      if obj_id not in known_module_dicts:
-        # Trigger loading of lazily loaded modules (such as pytest vendored
-        # modules).
-        # This pass over sys.modules needs to iterate on a copy of sys.modules
-        # since lazy loading modifies the dictionary, hence the use of list().
-        for m in list(sys.modules.values()):
-          try:
-            _ = m.__dict__
-          except AttributeError:
-            pass
-
-        for m in list(sys.modules.values()):
-          try:
-            if (m and m.__name__ != '__main__' and
-                isinstance(m, dill.dill.ModuleType)):
-              d = m.__dict__
-              known_module_dicts[id(d)] = m, d
-          except AttributeError:
-            # Skip modules that do not have the __name__ attribute.
-            pass
-    if obj_id in known_module_dicts and dill.dill.is_dill(pickler):
-      m = known_module_dicts[obj_id][0]
-      try:
-        # pylint: disable=protected-access
-        dill.dill._import_module(m.__name__)
-        return pickler.save_reduce(
-            getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj)
-      except (ImportError, AttributeError):
-        return old_save_module_dict(pickler, obj)
-    else:
-      return old_save_module_dict(pickler, obj)
-
-  dill.dill.save_module_dict = new_save_module_dict
-
-  def _nest_dill_logging():
-    """Prefix all dill logging with its depth in the callstack.
-
-    Useful for debugging pickling of deeply nested structures.
-    """
-    old_log_info = dill.dill.log.info
-
-    def new_log_info(msg, *args, **kwargs):
-      old_log_info(
-          ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,
-          *args,
-          **kwargs)
-
-    dill.dill.log.info = new_log_info
-
-
-# Turn off verbose logging from the dill pickler.
-logging.getLogger('dill').setLevel(logging.WARN)
+desired_pickle_lib = None
 
 
 def dumps(o, enable_trace=True, use_zlib=False):
   # type: (...) -> bytes
 
-  """For internal use only; no backwards-compatibility guarantees."""
-  with _pickle_lock:
-    try:
-      s = dill.dumps(o, byref=settings['dill_byref'])
-    except Exception:  # pylint: disable=broad-except
-      if enable_trace:
-        dill.dill._trace(True)  # pylint: disable=protected-access
-        s = dill.dumps(o, byref=settings['dill_byref'])
-      else:
-        raise
-    finally:
-      dill.dill._trace(False)  # pylint: disable=protected-access
-
-  # Compress as compactly as possible (compresslevel=9) to decrease peak memory
-  # usage (of multiple in-memory copies) and to avoid hitting protocol buffer
-  # limits.
-  # WARNING: Be cautious about compressor change since it can lead to pipeline
-  # representation change, and can break streaming job update compatibility on
-  # runners such as Dataflow.
-  if use_zlib:
-    c = zlib.compress(s, 9)
-  else:
-    c = bz2.compress(s, compresslevel=9)
-  del s  # Free up some possibly large and no-longer-needed memory.
-
-  return base64.b64encode(c)
+  return desired_pickle_lib.dumps(
+      o, enable_trace=enable_trace, use_zlib=use_zlib)
 
 
 def loads(encoded, enable_trace=True, use_zlib=False):
   """For internal use only; no backwards-compatibility guarantees."""
 
-  c = base64.b64decode(encoded)
-
-  if use_zlib:
-    s = zlib.decompress(c)
-  else:
-    s = bz2.decompress(c)
-
-  del c  # Free up some possibly large and no-longer-needed memory.
-
-  with _pickle_lock:
-    try:
-      return dill.loads(s)
-    except Exception:  # pylint: disable=broad-except
-      if enable_trace:
-        dill.dill._trace(True)  # pylint: disable=protected-access
-        return dill.loads(s)
-      else:
-        raise
-    finally:
-      dill.dill._trace(False)  # pylint: disable=protected-access
+  return desired_pickle_lib.loads(
+      encoded, enable_trace=enable_trace, use_zlib=use_zlib)
 
 
 def dump_session(file_path):
   """For internal use only; no backwards-compatibility guarantees.
 
   Pickle the current python session to be used in the worker.
-
-  Note: Due to the inconsistency in the first dump of dill dump_session we
-  create and load the dump twice to have consistent results in the worker and
-  the running session. Check: https://github.com/uqfoundation/dill/issues/195
   """
-  with _pickle_lock:
-    dill.dump_session(file_path)
-    dill.load_session(file_path)
-    return dill.dump_session(file_path)
+
+  return desired_pickle_lib.dump_session(file_path)
 
 
 def load_session(file_path):
-  with _pickle_lock:
-    return dill.load_session(file_path)
+  return desired_pickle_lib.load_session(file_path)
+
+
+def set_pickle_lib(pickle_lib_enum):

Review comment:
       is this meant to be internal method to this module? If so, let's prepend with underscore (`_set_pickle_lib`). Same for pickle constants/enums, global vars.

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -1056,6 +1056,12 @@ def _add_argparse_args(cls, parser):
             'currently an experimental flag and provides no stability. '
             'Multiple --beam_plugin options can be specified if more than '
             'one plugin is needed.'))
+    parser.add_argument(
+        '--set_pickle_library',

Review comment:
       how about we call it `--pickle_library` or `--pickle_options`

##########
File path: sdks/python/apache_beam/coders/coders_test_common.py
##########
@@ -667,11 +667,13 @@ def test_state_backed_iterable_coder(self):
     state = {}
 
     def iterable_state_write(values, element_coder_impl):
+      global state

Review comment:
       what is the testing plan for this change? I think we need to find a way to run some tests on both picklers, perhaps similarly to how we parameterize FnAPI runner tests, or change default pickler in one of the tox configurations.

##########
File path: sdks/python/apache_beam/coders/coders_test_common.py
##########
@@ -680,14 +682,14 @@ def iterable_state_read(token, element_coder_impl):
     context = pipeline_context.PipelineContext(
         iterable_state_read=iterable_state_read,
         iterable_state_write=iterable_state_write)
-    self.check_coder(
-        coder, [1, 2, 3], context=context, test_size_estimation=False)
+    # Note: do not use check_coder see https://github.com/cloudpipe/cloudpickle/issues/452

Review comment:
       Sounds like a possible regression that we'd need to document. pinged cloudpickle bug to see if they consider fixing this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org