You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2017/04/17 17:04:00 UTC

spark git commit: [SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked `collections.namedtuple` and port cloudpickle changes for PySpark to work with Python 3.6.0

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 24f6ef25a -> 84be4c8d6


[SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked `collections.namedtuple` and port cloudpickle changes for PySpark to work with Python 3.6.0

## What changes were proposed in this pull request?

This PR proposes to backports https://github.com/apache/spark/pull/16429 to branch-2.0 so that Python 3.6.0 works with Spark 2.0.x.

## How was this patch tested?

Manually, via

```
./run-tests --python-executables=python3.6
```

```
Finished test(python3.6): pyspark.tests (124s)
Finished test(python3.6): pyspark.accumulators (4s)
Finished test(python3.6): pyspark.broadcast (4s)
Finished test(python3.6): pyspark.conf (3s)
Finished test(python3.6): pyspark.context (15s)
Finished test(python3.6): pyspark.ml.classification (24s)
Finished test(python3.6): pyspark.sql.tests (190s)
Finished test(python3.6): pyspark.mllib.tests (190s)
Finished test(python3.6): pyspark.ml.clustering (14s)
Finished test(python3.6): pyspark.ml.linalg.__init__ (0s)
Finished test(python3.6): pyspark.ml.recommendation (18s)
Finished test(python3.6): pyspark.ml.feature (28s)
Finished test(python3.6): pyspark.ml.evaluation (28s)
Finished test(python3.6): pyspark.ml.regression (21s)
Finished test(python3.6): pyspark.ml.tuning (17s)
Finished test(python3.6): pyspark.streaming.tests (239s)
Finished test(python3.6): pyspark.mllib.evaluation (15s)
Finished test(python3.6): pyspark.mllib.classification (24s)
Finished test(python3.6): pyspark.mllib.clustering (37s)
Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s)
Finished test(python3.6): pyspark.mllib.fpm (19s)
Finished test(python3.6): pyspark.mllib.feature (19s)
Finished test(python3.6): pyspark.mllib.random (8s)
Finished test(python3.6): pyspark.ml.tests (76s)
Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s)
Finished test(python3.6): pyspark.mllib.recommendation (21s)
Finished test(python3.6): pyspark.mllib.linalg.distributed (27s)
Finished test(python3.6): pyspark.mllib.regression (22s)
Finished test(python3.6): pyspark.mllib.stat._statistics (11s)
Finished test(python3.6): pyspark.mllib.tree (16s)
Finished test(python3.6): pyspark.profiler (8s)
Finished test(python3.6): pyspark.shuffle (1s)
Finished test(python3.6): pyspark.mllib.util (17s)
Finished test(python3.6): pyspark.serializers (12s)
Finished test(python3.6): pyspark.rdd (18s)
Finished test(python3.6): pyspark.sql.conf (4s)
Finished test(python3.6): pyspark.sql.catalog (14s)
Finished test(python3.6): pyspark.sql.column (13s)
Finished test(python3.6): pyspark.sql.context (15s)
Finished test(python3.6): pyspark.sql.group (26s)
Finished test(python3.6): pyspark.sql.dataframe (31s)
Finished test(python3.6): pyspark.sql.functions (32s)
Finished test(python3.6): pyspark.sql.types (5s)
Finished test(python3.6): pyspark.sql.streaming (11s)
Finished test(python3.6): pyspark.sql.window (5s)
Finished test(python3.6): pyspark.streaming.util (0s)
Finished test(python3.6): pyspark.sql.session (15s)
Finished test(python3.6): pyspark.sql.readwriter (34s)
Tests passed in 376 seconds
```

Author: hyukjinkwon <gu...@gmail.com>

Closes #17374 from HyukjinKwon/SPARK-19019-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84be4c8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84be4c8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84be4c8d

Branch: refs/heads/branch-2.0
Commit: 84be4c8d6fd52774462762f1f5972f60d286c289
Parents: 24f6ef2
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Apr 17 10:03:42 2017 -0700
Committer: Holden Karau <ho...@us.ibm.com>
Committed: Mon Apr 17 10:03:42 2017 -0700

----------------------------------------------------------------------
 python/pyspark/cloudpickle.py | 98 ++++++++++++++++++++++++++------------
 python/pyspark/serializers.py | 20 ++++++++
 2 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84be4c8d/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 822ae46..94168f0 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -43,6 +43,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 from __future__ import print_function
 
 import operator
+import opcode
 import os
 import io
 import pickle
@@ -53,6 +54,8 @@ from functools import partial
 import itertools
 import dis
 import traceback
+import weakref
+
 
 if sys.version < '3':
     from pickle import Pickler
@@ -68,10 +71,10 @@ else:
     PY3 = True
 
 #relevant opcodes
-STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
-DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
-LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
-GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+STORE_GLOBAL = opcode.opmap['STORE_GLOBAL']
+DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL']
+LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
+GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL)
 HAVE_ARGUMENT = dis.HAVE_ARGUMENT
 EXTENDED_ARG = dis.EXTENDED_ARG
 
@@ -90,6 +93,43 @@ def _builtin_type(name):
     return getattr(types, name)
 
 
+if sys.version_info < (3, 4):
+    def _walk_global_ops(code):
+        """
+        Yield (opcode, argument number) tuples for all
+        global-referencing instructions in *code*.
+        """
+        code = getattr(code, 'co_code', b'')
+        if not PY3:
+            code = map(ord, code)
+
+        n = len(code)
+        i = 0
+        extended_arg = 0
+        while i < n:
+            op = code[i]
+            i += 1
+            if op >= HAVE_ARGUMENT:
+                oparg = code[i] + code[i + 1] * 256 + extended_arg
+                extended_arg = 0
+                i += 2
+                if op == EXTENDED_ARG:
+                    extended_arg = oparg * 65536
+                if op in GLOBAL_OPS:
+                    yield op, oparg
+
+else:
+    def _walk_global_ops(code):
+        """
+        Yield (opcode, argument number) tuples for all
+        global-referencing instructions in *code*.
+        """
+        for instr in dis.get_instructions(code):
+            op = instr.opcode
+            if op in GLOBAL_OPS:
+                yield op, instr.arg
+
+
 class CloudPickler(Pickler):
 
     dispatch = Pickler.dispatch.copy()
@@ -250,38 +290,34 @@ class CloudPickler(Pickler):
         write(pickle.TUPLE)
         write(pickle.REDUCE)  # applies _fill_function on the tuple
 
-    @staticmethod
-    def extract_code_globals(co):
+    _extract_code_globals_cache = (
+        weakref.WeakKeyDictionary()
+        if sys.version_info >= (2, 7) and not hasattr(sys, "pypy_version_info")
+        else {})
+
+    @classmethod
+    def extract_code_globals(cls, co):
         """
         Find all globals names read or written to by codeblock co
         """
-        code = co.co_code
-        if not PY3:
-            code = [ord(c) for c in code]
-        names = co.co_names
-        out_names = set()
-
-        n = len(code)
-        i = 0
-        extended_arg = 0
-        while i < n:
-            op = code[i]
+        out_names = cls._extract_code_globals_cache.get(co)
+        if out_names is None:
+            try:
+                names = co.co_names
+            except AttributeError:
+                # PyPy "builtin-code" object
+                out_names = set()
+            else:
+                out_names = set(names[oparg]
+                                for op, oparg in _walk_global_ops(co))
 
-            i += 1
-            if op >= HAVE_ARGUMENT:
-                oparg = code[i] + code[i+1] * 256 + extended_arg
-                extended_arg = 0
-                i += 2
-                if op == EXTENDED_ARG:
-                    extended_arg = oparg*65536
-                if op in GLOBAL_OPS:
-                    out_names.add(names[oparg])
+                # see if nested function have any global refs
+                if co.co_consts:
+                    for const in co.co_consts:
+                        if type(const) is types.CodeType:
+                            out_names |= cls.extract_code_globals(const)
 
-        # see if nested function have any global refs
-        if co.co_consts:
-            for const in co.co_consts:
-                if type(const) is types.CodeType:
-                    out_names |= CloudPickler.extract_code_globals(const)
+            cls._extract_code_globals_cache[co] = out_names
 
         return out_names
 

http://git-wip-us.apache.org/repos/asf/spark/blob/84be4c8d/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 2a13269..a9e14b8 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -370,18 +370,38 @@ def _hijack_namedtuple():
         return
 
     global _old_namedtuple  # or it will put in closure
+    global _old_namedtuple_kwdefaults  # or it will put in closure too
 
     def _copy_func(f):
         return types.FunctionType(f.__code__, f.__globals__, f.__name__,
                                   f.__defaults__, f.__closure__)
 
+    def _kwdefaults(f):
+        # __kwdefaults__ contains the default values of keyword-only arguments which are
+        # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple
+        # are as below:
+        #
+        # - Does not exist in Python 2.
+        # - Returns None in <= Python 3.5.x.
+        # - Returns a dictionary containing the default values to the keys from Python 3.6.x
+        #    (See https://bugs.python.org/issue25628).
+        kargs = getattr(f, "__kwdefaults__", None)
+        if kargs is None:
+            return {}
+        else:
+            return kargs
+
     _old_namedtuple = _copy_func(collections.namedtuple)
+    _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple)
 
     def namedtuple(*args, **kwargs):
+        for k, v in _old_namedtuple_kwdefaults.items():
+            kwargs[k] = kwargs.get(k, v)
         cls = _old_namedtuple(*args, **kwargs)
         return _hack_namedtuple(cls)
 
     # replace namedtuple with new one
+    collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults
     collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
     collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
     collections.namedtuple.__code__ = namedtuple.__code__


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org