You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/02 06:03:06 UTC

[spark] branch master updated: [SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d30e3365c1 [SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0
9d30e3365c1 is described below

commit 9d30e3365c17b94438290e59e6900ada570b94cc
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Wed Nov 2 15:02:53 2022 +0900

    [SPARK-40991][PYTHON] Update `cloudpickle` to v2.2.0
    
    ### What changes were proposed in this pull request?
    
    This PR aims to update `cloudpickle` to `v2.2.0` for Apache Spark 3.4.0.
    
    ### Why are the changes needed?
    
    SPARK-37457 updated `cloudpickle` v2.0.0 at Apache Spark 3.3.0.
    
    To bring the latest bug fixes.
    - https://github.com/cloudpipe/cloudpickle/releases/tag/v2.2.0
    - https://github.com/cloudpipe/cloudpickle/releases/tag/2.1.0
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #38474 from dongjoon-hyun/SPARK-40991.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/cloudpickle/__init__.py         |  5 +-
 python/pyspark/cloudpickle/cloudpickle.py      | 77 ++++++++--------------
 python/pyspark/cloudpickle/cloudpickle_fast.py | 91 ++++++++++++++++++--------
 3 files changed, 90 insertions(+), 83 deletions(-)

diff --git a/python/pyspark/cloudpickle/__init__.py b/python/pyspark/cloudpickle/__init__.py
index 0ae79b5535c..efbf1178d43 100644
--- a/python/pyspark/cloudpickle/__init__.py
+++ b/python/pyspark/cloudpickle/__init__.py
@@ -1,6 +1,3 @@
-from __future__ import absolute_import
-
-
 from pyspark.cloudpickle.cloudpickle import *  # noqa
 from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump  # noqa
 
@@ -8,4 +5,4 @@ from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump  # no
 # expose their Pickler subclass at top-level under the  "Pickler" name.
 Pickler = CloudPickler
 
-__version__ = '2.0.0'
+__version__ = '2.2.0'
diff --git a/python/pyspark/cloudpickle/cloudpickle.py b/python/pyspark/cloudpickle/cloudpickle.py
index 347b3869580..317be69151a 100644
--- a/python/pyspark/cloudpickle/cloudpickle.py
+++ b/python/pyspark/cloudpickle/cloudpickle.py
@@ -40,7 +40,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
-from __future__ import print_function
 
 import builtins
 import dis
@@ -56,7 +55,7 @@ import warnings
 
 from .compat import pickle
 from collections import OrderedDict
-from typing import Generic, Union, Tuple, Callable
+from typing import ClassVar, Generic, Union, Tuple, Callable
 from pickle import _getattribute
 from importlib._bootstrap import _find_spec
 
@@ -66,11 +65,6 @@ try:  # pragma: no branch
 except ImportError:
     _typing_extensions = Literal = Final = None
 
-if sys.version_info >= (3, 5, 3):
-    from typing import ClassVar
-else:  # pragma: no cover
-    ClassVar = None
-
 if sys.version_info >= (3, 8):
     from types import CellType
 else:
@@ -327,11 +321,10 @@ def _extract_code_globals(co):
     """
     out_names = _extract_code_globals_cache.get(co)
     if out_names is None:
-        names = co.co_names
         # We use a dict with None values instead of a set to get a
         # deterministic order (assuming Python 3.6+) and avoid introducing
         # non-deterministic pickle bytes as a results.
-        out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
+        out_names = {name: None for name in _walk_global_ops(co)}
 
         # Declaring a function inside another one using the "def ..."
         # syntax generates a constant code object corresponding to the one
@@ -517,13 +510,12 @@ def _builtin_type(name):
 
 def _walk_global_ops(code):
     """
-    Yield (opcode, argument number) tuples for all
-    global-referencing instructions in *code*.
+    Yield referenced name for all global-referencing instructions in *code*.
     """
     for instr in dis.get_instructions(code):
         op = instr.opcode
         if op in GLOBAL_OPS:
-            yield op, instr.arg
+            yield instr.argval
 
 
 def _extract_class_dict(cls):
@@ -604,43 +596,21 @@ def parametrized_type_hint_getinitargs(obj):
     elif type(obj) is type(ClassVar):
         initargs = (ClassVar, obj.__type__)
     elif type(obj) is type(Generic):
-        parameters = obj.__parameters__
-        if len(obj.__parameters__) > 0:
-            # in early Python 3.5, __parameters__ was sometimes
-            # preferred to __args__
-            initargs = (obj.__origin__, parameters)
-
-        else:
-            initargs = (obj.__origin__, obj.__args__)
+        initargs = (obj.__origin__, obj.__args__)
     elif type(obj) is type(Union):
-        if sys.version_info < (3, 5, 3):  # pragma: no cover
-            initargs = (Union, obj.__union_params__)
-        else:
-            initargs = (Union, obj.__args__)
+        initargs = (Union, obj.__args__)
     elif type(obj) is type(Tuple):
-        if sys.version_info < (3, 5, 3):  # pragma: no cover
-            initargs = (Tuple, obj.__tuple_params__)
-        else:
-            initargs = (Tuple, obj.__args__)
+        initargs = (Tuple, obj.__args__)
     elif type(obj) is type(Callable):
-        if sys.version_info < (3, 5, 3):  # pragma: no cover
-            args = obj.__args__
-            result = obj.__result__
-            if args != Ellipsis:
-                if isinstance(args, tuple):
-                    args = list(args)
-                else:
-                    args = [args]
+        (*args, result) = obj.__args__
+        if len(args) == 1 and args[0] is Ellipsis:
+            args = Ellipsis
         else:
-            (*args, result) = obj.__args__
-            if len(args) == 1 and args[0] is Ellipsis:
-                args = Ellipsis
-            else:
-                args = list(args)
+            args = list(args)
         initargs = (Callable, (args, result))
     else:  # pragma: no cover
         raise pickle.PicklingError(
-            "Cloudpickle Error: Unknown type {}".format(type(obj))
+            f"Cloudpickle Error: Unknown type {type(obj)}"
         )
     return initargs
 
@@ -720,7 +690,7 @@ def instance(cls):
 
 
 @instance
-class _empty_cell_value(object):
+class _empty_cell_value:
     """sentinel for empty closures
     """
     @classmethod
@@ -749,7 +719,7 @@ def _fill_function(*args):
         keys = ['globals', 'defaults', 'dict', 'module', 'closure_values']
         state = dict(zip(keys, args[1:]))
     else:
-        raise ValueError('Unexpected _fill_value arguments: %r' % (args,))
+        raise ValueError(f'Unexpected _fill_value arguments: {args!r}')
 
     # - At pickling time, any dynamic global variable used by func is
     #   serialized by value (in state['globals']).
@@ -793,6 +763,12 @@ def _fill_function(*args):
     return func
 
 
+def _make_function(code, globals, name, argdefs, closure):
+    # Setting __builtins__ in globals is needed for nogil CPython.
+    globals["__builtins__"] = __builtins__
+    return types.FunctionType(code, globals, name, argdefs, closure)
+
+
 def _make_empty_cell():
     if False:
         # trick the compiler into creating an empty cell in our lambda
@@ -917,15 +893,10 @@ def _make_typevar(name, bound, constraints, covariant, contravariant,
 
 
 def _decompose_typevar(obj):
-    try:
-        class_tracker_id = _get_or_create_tracker_id(obj)
-    except TypeError:  # pragma: nocover
-        # TypeVar instances are not weakref-able in Python 3.5.3
-        class_tracker_id = None
     return (
         obj.__name__, obj.__bound__, obj.__constraints__,
         obj.__covariant__, obj.__contravariant__,
-        class_tracker_id,
+        _get_or_create_tracker_id(obj),
     )
 
 
@@ -943,8 +914,12 @@ def _typevar_reduce(obj):
 
 
 def _get_bases(typ):
-    if hasattr(typ, '__orig_bases__'):
+    if '__orig_bases__' in getattr(typ, '__dict__', {}):
         # For generic types (see PEP 560)
+        # Note that simply checking `hasattr(typ, '__orig_bases__')` is not
+        # correct.  Subclasses of a fully-parameterized generic class does not
+        # have `__orig_bases__` defined, but `hasattr(typ, '__orig_bases__')`
+        # will return True because it's defined in the base class.
         bases_attr = '__orig_bases__'
     else:
         # For regular class objects
diff --git a/python/pyspark/cloudpickle/cloudpickle_fast.py b/python/pyspark/cloudpickle/cloudpickle_fast.py
index 6db059eb858..8741dcbdaaa 100644
--- a/python/pyspark/cloudpickle/cloudpickle_fast.py
+++ b/python/pyspark/cloudpickle/cloudpickle_fast.py
@@ -35,11 +35,11 @@ from .cloudpickle import (
     _is_parametrized_type_hint, PYPY, cell_set,
     parametrized_type_hint_getinitargs, _create_parametrized_type_hint,
     builtin_code_type,
-    _make_dict_keys, _make_dict_values, _make_dict_items,
+    _make_dict_keys, _make_dict_values, _make_dict_items, _make_function,
 )
 
 
-if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
+if pickle.HIGHEST_PROTOCOL >= 5:
     # Shorthands similar to pickle.dump/pickle.dumps
 
     def dump(obj, file, protocol=None, buffer_callback=None):
@@ -123,7 +123,7 @@ def _class_getnewargs(obj):
 
 
 def _enum_getnewargs(obj):
-    members = dict((e.name, e.value) for e in obj)
+    members = {e.name: e.value for e in obj}
     return (obj.__bases__, obj.__name__, obj.__qualname__, members,
             obj.__module__, _get_or_create_tracker_id(obj), None)
 
@@ -218,7 +218,7 @@ def _class_getstate(obj):
 def _enum_getstate(obj):
     clsdict, slotstate = _class_getstate(obj)
 
-    members = dict((e.name, e.value) for e in obj)
+    members = {e.name: e.value for e in obj}
     # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class:
     # Those attributes are already handled by the metaclass.
     for attrname in ["_generate_next_value_", "_member_names_",
@@ -244,7 +244,22 @@ def _enum_getstate(obj):
 
 def _code_reduce(obj):
     """codeobject reducer"""
-    if hasattr(obj, "co_linetable"):  # pragma: no branch
+    # If you are not sure about the order of arguments, take a look at help
+    # of the specific type from types, for example:
+    # >>> from types import CodeType
+    # >>> help(CodeType)
+    if hasattr(obj, "co_exceptiontable"):  # pragma: no branch
+        # Python 3.11 and later: there are some new attributes
+        # related to the enhanced exceptions.
+        args = (
+            obj.co_argcount, obj.co_posonlyargcount,
+            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
+            obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
+            obj.co_varnames, obj.co_filename, obj.co_name, obj.co_qualname,
+            obj.co_firstlineno, obj.co_linetable, obj.co_exceptiontable,
+            obj.co_freevars, obj.co_cellvars,
+        )
+    elif hasattr(obj, "co_linetable"):  # pragma: no branch
         # Python 3.10 and later: obj.co_lnotab is deprecated and constructor
         # expects obj.co_linetable instead.
         args = (
@@ -255,6 +270,18 @@ def _code_reduce(obj):
             obj.co_firstlineno, obj.co_linetable, obj.co_freevars,
             obj.co_cellvars
         )
+    elif hasattr(obj, "co_nmeta"):  # pragma: no cover
+        # "nogil" Python: modified attributes from 3.9
+        args = (
+            obj.co_argcount, obj.co_posonlyargcount,
+            obj.co_kwonlyargcount, obj.co_nlocals, obj.co_framesize,
+            obj.co_ndefaultargs, obj.co_nmeta,
+            obj.co_flags, obj.co_code, obj.co_consts,
+            obj.co_varnames, obj.co_filename, obj.co_name,
+            obj.co_firstlineno, obj.co_lnotab, obj.co_exc_handlers,
+            obj.co_jump_table, obj.co_freevars, obj.co_cellvars,
+            obj.co_free2reg, obj.co_cell2reg
+        )
     elif hasattr(obj, "co_posonlyargcount"):
         # Backward compat for 3.9 and older
         args = (
@@ -534,7 +561,10 @@ class CloudPickler(Pickler):
     _dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce
     _dispatch_table[type(OrderedDict().values())] = _odict_values_reduce
     _dispatch_table[type(OrderedDict().items())] = _odict_items_reduce
-
+    _dispatch_table[abc.abstractmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractclassmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractstaticmethod] = _classmethod_reduce
+    _dispatch_table[abc.abstractproperty] = _property_reduce
 
     dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)
 
@@ -544,7 +574,7 @@ class CloudPickler(Pickler):
         """Reduce a function that is not pickleable via attribute lookup."""
         newargs = self._function_getnewargs(func)
         state = _function_getstate(func)
-        return (types.FunctionType, newargs, state, None, None,
+        return (_make_function, newargs, state, None, None,
                 _function_setstate)
 
     def _function_reduce(self, obj):
@@ -611,6 +641,32 @@ class CloudPickler(Pickler):
                 raise
 
     if pickle.HIGHEST_PROTOCOL >= 5:
+        def __init__(self, file, protocol=None, buffer_callback=None):
+            if protocol is None:
+                protocol = DEFAULT_PROTOCOL
+            Pickler.__init__(
+                self, file, protocol=protocol, buffer_callback=buffer_callback
+            )
+            # map functions __globals__ attribute ids, to ensure that functions
+            # sharing the same global namespace at pickling time also share
+            # their global namespace at unpickling time.
+            self.globals_ref = {}
+            self.proto = int(protocol)
+    else:
+        def __init__(self, file, protocol=None):
+            if protocol is None:
+                protocol = DEFAULT_PROTOCOL
+            Pickler.__init__(self, file, protocol=protocol)
+            # map functions __globals__ attribute ids, to ensure that functions
+            # sharing the same global namespace at pickling time also share
+            # their global namespace at unpickling time.
+            self.globals_ref = {}
+            assert hasattr(self, 'proto')
+
+    if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
+        # Pickler is the C implementation of the CPython pickler and therefore
+        # we rely on reduce_override method to customize the pickler behavior.
+
         # `CloudPickler.dispatch` is only left for backward compatibility - note
         # that when using protocol 5, `CloudPickler.dispatch` is not an
         # extension of `Pickler.dispatch` dictionary, because CloudPickler
@@ -631,17 +687,6 @@ class CloudPickler(Pickler):
         # availability of both notions coincide on CPython's pickle and the
         # pickle5 backport, but it may not be the case anymore when pypy
         # implements protocol 5
-        def __init__(self, file, protocol=None, buffer_callback=None):
-            if protocol is None:
-                protocol = DEFAULT_PROTOCOL
-            Pickler.__init__(
-                self, file, protocol=protocol, buffer_callback=buffer_callback
-            )
-            # map functions __globals__ attribute ids, to ensure that functions
-            # sharing the same global namespace at pickling time also share
-            # their global namespace at unpickling time.
-            self.globals_ref = {}
-            self.proto = int(protocol)
 
         def reducer_override(self, obj):
             """Type-agnostic reducing callback for function and classes.
@@ -702,16 +747,6 @@ class CloudPickler(Pickler):
         # hard-coded call to save_global when pickling meta-classes.
         dispatch = Pickler.dispatch.copy()
 
-        def __init__(self, file, protocol=None):
-            if protocol is None:
-                protocol = DEFAULT_PROTOCOL
-            Pickler.__init__(self, file, protocol=protocol)
-            # map functions __globals__ attribute ids, to ensure that functions
-            # sharing the same global namespace at pickling time also share
-            # their global namespace at unpickling time.
-            self.globals_ref = {}
-            assert hasattr(self, 'proto')
-
         def _save_reduce_pickle5(self, func, args, state=None, listitems=None,
                                  dictitems=None, state_setter=None, obj=None):
             save = self.save


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org