You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/16 09:23:03 UTC

[spark] branch branch-3.3 updated: [SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3bbf346  [SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4
3bbf346 is described below

commit 3bbf346d9ca984faa0c3e67cd1387a13b2bd1e37
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Wed Mar 16 18:20:50 2022 +0900

    [SPARK-38563][PYTHON] Upgrade to Py4J 0.10.9.4
    
    ### What changes were proposed in this pull request?
    
    This PR upgrade Py4J 0.10.9.4, with relevant documentation changes.
    
    ### Why are the changes needed?
    
    Py4J 0.10.9.3 has a resource leak issue when pinned thread mode is enabled - it's enabled by default in PySpark at https://github.com/apache/spark/commit/41af409b7bcfe1b3960274c0b3085bcc1f9d1c98.
    We worked around this by enforcing users to use `InheritableThread` or `inhteritable_thread_target` as a workaround.
    After upgrading, we don't need to enforce users anymore because it automatically cleans up, see also https://github.com/py4j/py4j/pull/471
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users don't have to use `InheritableThread` or `inhteritable_thread_target` to avoid resource leaking problem anymore.
    
    ### How was this patch tested?
    
    CI in this PR should test it out.
    
    Closes #35871 from HyukjinKwon/SPARK-38563.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 8193b405f02f867439dd2d2017bf7b3c814b5cc8)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 bin/pyspark                                        |   2 +-
 bin/pyspark2.cmd                                   |   2 +-
 core/pom.xml                                       |   2 +-
 .../org/apache/spark/api/python/PythonUtils.scala  |   2 +-
 dev/deps/spark-deps-hadoop-2-hive-2.3              |   2 +-
 dev/deps/spark-deps-hadoop-3-hive-2.3              |   2 +-
 docs/job-scheduling.md                             |   2 +-
 python/docs/Makefile                               |   2 +-
 python/docs/make2.bat                              |   2 +-
 python/docs/source/getting_started/install.rst     |   2 +-
 python/lib/py4j-0.10.9.3-src.zip                   | Bin 42021 -> 0 bytes
 python/lib/py4j-0.10.9.4-src.zip                   | Bin 0 -> 42404 bytes
 python/pyspark/context.py                          |   6 ++--
 python/pyspark/util.py                             |  35 +++------------------
 python/setup.py                                    |   2 +-
 sbin/spark-config.sh                               |   2 +-
 16 files changed, 20 insertions(+), 45 deletions(-)

diff --git a/bin/pyspark b/bin/pyspark
index 4840589..1e16c56 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS
 
 # Add the PySpark classes to the Python path:
 export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:$PYTHONPATH"
 
 # Load the PySpark shell.py script when ./pyspark is used interactively:
 export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a19627a..f20c320 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
 )
 
 set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.4-src.zip;%PYTHONPATH%
 
 set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
 set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index 9d3b170..953c76b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -423,7 +423,7 @@
     <dependency>
       <groupId>net.sf.py4j</groupId>
       <artifactId>py4j</artifactId>
-      <version>0.10.9.3</version>
+      <version>0.10.9.4</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 8daba86..a9c35369 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 
 private[spark] object PythonUtils {
-  val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip"
+  val PY4J_ZIP_NAME = "py4j-0.10.9.4-src.zip"
 
   /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
   def sparkPythonPath: String = {
diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3
index bcbf8b9..f2db663 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
 parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
 pickle/1.2//pickle-1.2.jar
 protobuf-java/2.5.0//protobuf-java-2.5.0.jar
-py4j/0.10.9.3//py4j-0.10.9.3.jar
+py4j/0.10.9.4//py4j-0.10.9.4.jar
 remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
 rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
 scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 8ca7880..c56b4c9 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
 parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
 pickle/1.2//pickle-1.2.jar
 protobuf-java/2.5.0//protobuf-java-2.5.0.jar
-py4j/0.10.9.3//py4j-0.10.9.3.jar
+py4j/0.10.9.4//py4j-0.10.9.4.jar
 remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
 rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
 scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 4ed2aa9..f44ed82 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th
 later.
 
 `pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes
- such as local properties in a JVM thread, and to avoid resource leak.
+ such as local properties in a JVM thread.
 
diff --git a/python/docs/Makefile b/python/docs/Makefile
index 9cb1a17..2628530 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -21,7 +21,7 @@ SPHINXBUILD   ?= sphinx-build
 SOURCEDIR     ?= source
 BUILDDIR      ?= build
 
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.4-src.zip)
 
 # Put it first so that "make" without argument is like "make help".
 help:
diff --git a/python/docs/make2.bat b/python/docs/make2.bat
index 2e4e2b5..26ef220 100644
--- a/python/docs/make2.bat
+++ b/python/docs/make2.bat
@@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
 set SOURCEDIR=source
 set BUILDDIR=build
 
-set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip
+set PYTHONPATH=..;..\lib\py4j-0.10.9.4-src.zip
 
 if "%1" == "" goto help
 
diff --git a/python/docs/source/getting_started/install.rst b/python/docs/source/getting_started/install.rst
index 15a1240..3503be0 100644
--- a/python/docs/source/getting_started/install.rst
+++ b/python/docs/source/getting_started/install.rst
@@ -157,7 +157,7 @@ Package       Minimum supported version Note
 `pandas`      1.0.5                     Optional for Spark SQL
 `NumPy`       1.7                       Required for MLlib DataFrame-based API
 `pyarrow`     1.0.0                     Optional for Spark SQL
-`Py4J`        0.10.9.3                  Required
+`Py4J`        0.10.9.4                  Required
 `pandas`      1.0.5                     Required for pandas API on Spark
 `pyarrow`     1.0.0                     Required for pandas API on Spark
 `Numpy`       1.14                      Required for pandas API on Spark
diff --git a/python/lib/py4j-0.10.9.3-src.zip b/python/lib/py4j-0.10.9.3-src.zip
deleted file mode 100644
index 428f3ac..0000000
Binary files a/python/lib/py4j-0.10.9.3-src.zip and /dev/null differ
diff --git a/python/lib/py4j-0.10.9.4-src.zip b/python/lib/py4j-0.10.9.4-src.zip
new file mode 100644
index 0000000..51b3404
Binary files /dev/null and b/python/lib/py4j-0.10.9.4-src.zip differ
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index e47f162..59b5fa7 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1365,7 +1365,7 @@ class SparkContext:
         to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
 
         If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
-        local inheritance, and preventing resource leak.
+        local inheritance.
 
         Examples
         --------
@@ -1405,7 +1405,7 @@ class SparkContext:
         Notes
         -----
         If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
-        local inheritance, and preventing resource leak.
+        local inheritance.
         """
         self._jsc.setLocalProperty(key, value)
 
@@ -1423,7 +1423,7 @@ class SparkContext:
         Notes
         -----
         If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
-        local inheritance, and preventing resource leak.
+        local inheritance.
         """
         self._jsc.setJobDescription(value)
 
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index 5abbbb9..b7b972a 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -331,13 +331,10 @@ def inheritable_thread_target(f: Callable) -> Callable:
 
         @functools.wraps(f)
         def wrapped(*args: Any, **kwargs: Any) -> Any:
-            try:
-                # Set local properties in child thread.
-                assert SparkContext._active_spark_context is not None
-                SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
-                return f(*args, **kwargs)
-            finally:
-                InheritableThread._clean_py4j_conn_for_current_thread()
+            # Set local properties in child thread.
+            assert SparkContext._active_spark_context is not None
+            SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
+            return f(*args, **kwargs)
 
         return wrapped
     else:
@@ -377,10 +374,7 @@ class InheritableThread(threading.Thread):
                 assert hasattr(self, "_props")
                 assert SparkContext._active_spark_context is not None
                 SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props)
-                try:
-                    return target(*a, **k)
-                finally:
-                    InheritableThread._clean_py4j_conn_for_current_thread()
+                return target(*a, **k)
 
             super(InheritableThread, self).__init__(
                 target=copy_local_properties, *args, **kwargs  # type: ignore[misc]
@@ -401,25 +395,6 @@ class InheritableThread(threading.Thread):
             self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
         return super(InheritableThread, self).start()
 
-    @staticmethod
-    def _clean_py4j_conn_for_current_thread() -> None:
-        from pyspark import SparkContext
-
-        jvm = SparkContext._jvm
-        assert jvm is not None
-        thread_connection = jvm._gateway_client.get_thread_connection()
-        if thread_connection is not None:
-            try:
-                # Dequeue is shared across other threads but it's thread-safe.
-                # If this function has to be invoked one more time in the same thead
-                # Py4J will create a new connection automatically.
-                jvm._gateway_client.deque.remove(thread_connection)
-            except ValueError:
-                # Should never reach this point
-                return
-            finally:
-                thread_connection.close()
-
 
 if __name__ == "__main__":
     if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7):
diff --git a/python/setup.py b/python/setup.py
index 673b146..ab9b64f 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -258,7 +258,7 @@ try:
         license='http://www.apache.org/licenses/LICENSE-2.0',
         # Don't forget to update python/docs/source/getting_started/install.rst
         # if you're updating the versions or dependencies.
-        install_requires=['py4j==0.10.9.3'],
+        install_requires=['py4j==0.10.9.4'],
         extras_require={
             'ml': ['numpy>=1.15'],
             'mllib': ['numpy>=1.15'],
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index f27b6fe..341eb05 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
 # Add the PySpark classes to the PYTHONPATH:
 if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
   export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
-  export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
+  export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:${PYTHONPATH}"
   export PYSPARK_PYTHONPATH_SET=1
 fi

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org