You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/01/12 23:27:08 UTC

spark git commit: [SPARK-12652][PYSPARK] Upgrade Py4J to 0.9.1

Repository: spark
Updated Branches:
  refs/heads/master 8ed5f12d2 -> 4f60651cb


[SPARK-12652][PYSPARK] Upgrade Py4J to 0.9.1

- [x] Upgrade Py4J to 0.9.1
- [x] SPARK-12657: Revert SPARK-12617
- [x] SPARK-12658: Revert SPARK-12511
  - Still keep the change that only reading checkpoint once. This is a manual change and worth to take a look carefully. https://github.com/zsxwing/spark/commit/bfd4b5c040eb29394c3132af3c670b1a7272457c
- [x] Verify no leak any more after reverting our workarounds

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10692 from zsxwing/py4j-0.9.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f60651c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f60651c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f60651c

Branch: refs/heads/master
Commit: 4f60651cbec1b4c9cc2e6d832ace77e89a233f3a
Parents: 8ed5f12
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Jan 12 14:27:05 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Jan 12 14:27:05 2016 -0800

----------------------------------------------------------------------
 LICENSE                                         |   2 +-
 bin/pyspark                                     |   2 +-
 bin/pyspark2.cmd                                |   2 +-
 core/pom.xml                                    |   2 +-
 .../apache/spark/api/python/PythonUtils.scala   |   2 +-
 dev/deps/spark-deps-hadoop-2.2                  |   2 +-
 dev/deps/spark-deps-hadoop-2.3                  |   2 +-
 dev/deps/spark-deps-hadoop-2.4                  |   2 +-
 dev/deps/spark-deps-hadoop-2.6                  |   2 +-
 python/docs/Makefile                            |   2 +-
 python/lib/py4j-0.9-src.zip                     | Bin 44846 -> 0 bytes
 python/lib/py4j-0.9.1-src.zip                   | Bin 0 -> 47035 bytes
 python/pyspark/streaming/context.py             |  89 +------------------
 python/pyspark/streaming/util.py                |   3 +-
 sbin/spark-config.sh                            |   2 +-
 .../streaming/api/python/PythonDStream.scala    |  10 ---
 .../org/apache/spark/deploy/yarn/Client.scala   |   4 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    |   4 +-
 18 files changed, 20 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index a2f75b8..9c944ac 100644
--- a/LICENSE
+++ b/LICENSE
@@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
      (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
      (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
-     (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
+     (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
      (Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
      (BSD licence) sbt and sbt-launch-lib.bash
      (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 5eaa17d..2ac4a8b 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -67,7 +67,7 @@ export PYSPARK_PYTHON
 
 # Add the PySpark classes to the Python path:
 export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
 
 # Load the PySpark shell.py script when ./pyspark is used interactively:
 export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/bin/pyspark2.cmd
----------------------------------------------------------------------
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index a97d884..51d6d15 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
 )
 
 set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
 
 set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
 set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 34ecb19..3bec5de 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,7 +350,7 @@
     <dependency>
       <groupId>net.sf.py4j</groupId>
       <artifactId>py4j</artifactId>
-      <version>0.9</version>
+      <version>0.9.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 2d97cd9..bda8727 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
     val pythonPath = new ArrayBuffer[String]
     for (sparkHome <- sys.env.get("SPARK_HOME")) {
       pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
-      pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
+      pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
     }
     pythonPath ++= SparkContext.jarOfObject(this)
     pythonPath.mkString(File.pathSeparator)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index cd3ff29..53034a2 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -160,7 +160,7 @@ pmml-agent-1.2.7.jar
 pmml-model-1.2.7.jar
 pmml-schema-1.2.7.jar
 protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
 pyrolite-4.9.jar
 quasiquotes_2.10-2.0.0-M8.jar
 reflectasm-1.07-shaded.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 0985089..a23e260 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
 pmml-model-1.2.7.jar
 pmml-schema-1.2.7.jar
 protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
 pyrolite-4.9.jar
 quasiquotes_2.10-2.0.0-M8.jar
 reflectasm-1.07-shaded.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 50f0626..6bedbed 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
 pmml-model-1.2.7.jar
 pmml-schema-1.2.7.jar
 protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
 pyrolite-4.9.jar
 quasiquotes_2.10-2.0.0-M8.jar
 reflectasm-1.07-shaded.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 2b6ca98..7bfad57 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -158,7 +158,7 @@ pmml-agent-1.2.7.jar
 pmml-model-1.2.7.jar
 pmml-schema-1.2.7.jar
 protobuf-java-2.5.0.jar
-py4j-0.9.jar
+py4j-0.9.1.jar
 pyrolite-4.9.jar
 quasiquotes_2.10-2.0.0-M8.jar
 reflectasm-1.07-shaded.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/python/docs/Makefile
----------------------------------------------------------------------
diff --git a/python/docs/Makefile b/python/docs/Makefile
index 4cec74f..b6d24d8 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -7,7 +7,7 @@ SPHINXBUILD   = sphinx-build
 PAPER         =
 BUILDDIR      = _build
 
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
 
 # User-friendly check for sphinx-build
 ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/python/lib/py4j-0.9-src.zip
----------------------------------------------------------------------
diff --git a/python/lib/py4j-0.9-src.zip b/python/lib/py4j-0.9-src.zip
deleted file mode 100644
index dace2d0..0000000
Binary files a/python/lib/py4j-0.9-src.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/python/lib/py4j-0.9.1-src.zip
----------------------------------------------------------------------
diff --git a/python/lib/py4j-0.9.1-src.zip b/python/lib/py4j-0.9.1-src.zip
new file mode 100644
index 0000000..fedde84
Binary files /dev/null and b/python/lib/py4j-0.9.1-src.zip differ

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 0f1f005..ec3ad99 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,7 +19,6 @@ from __future__ import print_function
 
 import os
 import sys
-from threading import RLock, Timer
 
 from py4j.java_gateway import java_import, JavaObject
 
@@ -33,63 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
 __all__ = ["StreamingContext"]
 
 
-class Py4jCallbackConnectionCleaner(object):
-
-    """
-    A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
-    It will scan all callback connections every 30 seconds and close the dead connections.
-    """
-
-    def __init__(self, gateway):
-        self._gateway = gateway
-        self._stopped = False
-        self._timer = None
-        self._lock = RLock()
-
-    def start(self):
-        if self._stopped:
-            return
-
-        def clean_closed_connections():
-            from py4j.java_gateway import quiet_close, quiet_shutdown
-
-            callback_server = self._gateway._callback_server
-            if callback_server:
-                with callback_server.lock:
-                    try:
-                        closed_connections = []
-                        for connection in callback_server.connections:
-                            if not connection.isAlive():
-                                quiet_close(connection.input)
-                                quiet_shutdown(connection.socket)
-                                quiet_close(connection.socket)
-                                closed_connections.append(connection)
-
-                        for closed_connection in closed_connections:
-                            callback_server.connections.remove(closed_connection)
-                    except Exception:
-                        import traceback
-                        traceback.print_exc()
-
-            self._start_timer(clean_closed_connections)
-
-        self._start_timer(clean_closed_connections)
-
-    def _start_timer(self, f):
-        with self._lock:
-            if not self._stopped:
-                self._timer = Timer(30.0, f)
-                self._timer.daemon = True
-                self._timer.start()
-
-    def stop(self):
-        with self._lock:
-            self._stopped = True
-            if self._timer:
-                self._timer.cancel()
-                self._timer = None
-
-
 class StreamingContext(object):
     """
     Main entry point for Spark Streaming functionality. A StreamingContext
@@ -105,9 +47,6 @@ class StreamingContext(object):
     # Reference to a currently active StreamingContext
     _activeContext = None
 
-    # A cleaner to clean leak sockets of callback server every 30 seconds
-    _py4j_cleaner = None
-
     def __init__(self, sparkContext, batchDuration=None, jssc=None):
         """
         Create a new StreamingContext.
@@ -155,34 +94,12 @@ class StreamingContext(object):
             # get the GatewayServer object in JVM by ID
             jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
             # update the port of CallbackClient with real port
-            gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
-            _py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
-            _py4j_cleaner.start()
+            jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)
 
         # register serializer for TransformFunction
         # it happens before creating SparkContext when loading from checkpointing
-        if cls._transformerSerializer is None:
-            transformer_serializer = TransformFunctionSerializer()
-            transformer_serializer.init(
-                SparkContext._active_spark_context, CloudPickleSerializer(), gw)
-            # SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
-            # There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
-            # (https://github.com/bartdag/py4j/pull/184)
-            #
-            # Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
-            # calling "registerSerializer". If we call "registerSerializer" twice, the second
-            # PythonProxyHandler will override the first one, then the first one will be GCed and
-            # trigger "PythonProxyHandler.finalize". To avoid that, we should not call
-            # "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
-            # be GCed.
-            #
-            # TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
-            transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
-                transformer_serializer)
-            cls._transformerSerializer = transformer_serializer
-        else:
-            cls._transformerSerializer.init(
-                SparkContext._active_spark_context, CloudPickleSerializer(), gw)
+        cls._transformerSerializer = TransformFunctionSerializer(
+            SparkContext._active_spark_context, CloudPickleSerializer(), gw)
 
     @classmethod
     def getOrCreate(cls, checkpointPath, setupFunc):

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/python/pyspark/streaming/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index e617fc9..abbbf6e 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -89,10 +89,11 @@ class TransformFunctionSerializer(object):
     it uses this class to invoke Python, which returns the serialized function
     as a byte array.
     """
-    def init(self, ctx, serializer, gateway=None):
+    def __init__(self, ctx, serializer, gateway=None):
         self.ctx = ctx
         self.serializer = serializer
         self.gateway = gateway or self.ctx._gateway
+        self.gateway.jvm.PythonDStream.registerSerializer(self)
         self.failure = None
 
     def dumps(self, id):

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/sbin/spark-config.sh
----------------------------------------------------------------------
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index d8d9d00..0c37985 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -27,4 +27,4 @@ fi
 export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
 # Add the PySpark classes to the PYTHONPATH:
 export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:${PYTHONPATH}"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 953fe95..8c9becc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,16 +170,6 @@ private[python] object PythonDStream {
   }
 
   /**
-   * Update the port of callback client to `port`
-   */
-  def updatePythonGatewayPort(gws: GatewayServer, port: Int): Unit = {
-    val cl = gws.getCallbackClient
-    val f = cl.getClass.getDeclaredField("port")
-    f.setAccessible(true)
-    f.setInt(cl, port)
-  }
-
-  /**
    * helper function for DStream.foreachRDD(),
    * cannot be `foreachRDD`, it will confusing py4j
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8cf438b..d4ca255 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1044,9 +1044,9 @@ private[spark] class Client(
         val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
         require(pyArchivesFile.exists(),
           "pyspark.zip not found; cannot run pyspark application in YARN mode.")
-        val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
+        val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
         require(py4jFile.exists(),
-          "py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
+          "py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
         Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
       }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f60651c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 6db012a..b91c4be 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -151,9 +151,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     // When running tests, let's not assume the user has built the assembly module, which also
     // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
     // needed locations.
-    val sparkHome = sys.props("spark.test.home");
+    val sparkHome = sys.props("spark.test.home")
     val pythonPath = Seq(
-        s"$sparkHome/python/lib/py4j-0.9-src.zip",
+        s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
         s"$sparkHome/python")
     val extraEnv = Map(
       "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org