You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/20 19:54:38 UTC

spark git commit: [SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9

Repository: spark
Updated Branches:
  refs/heads/master 94139557c -> e18b571c3


[SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9

Upgrade to Py4j0.9

Author: Holden Karau <ho...@pigscanfly.ca>
Author: Holden Karau <ho...@us.ibm.com>

Closes #8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e18b571c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e18b571c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e18b571c

Branch: refs/heads/master
Commit: e18b571c3374ecbfc0b20a5064cb58d57a2a7d21
Parents: 9413955
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Tue Oct 20 10:52:49 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Oct 20 10:52:49 2015 -0700

----------------------------------------------------------------------
 LICENSE                                         |   2 +-
 bin/pyspark                                     |   2 +-
 bin/pyspark2.cmd                                |   2 +-
 core/pom.xml                                    |   2 +-
 .../apache/spark/api/python/PythonUtils.scala   |   2 +-
 python/docs/Makefile                            |   2 +-
 python/lib/py4j-0.8.2.1-src.zip                 | Bin 37562 -> 0 bytes
 python/lib/py4j-0.9-src.zip                     | Bin 0 -> 44846 bytes
 python/pyspark/streaming/context.py             |  54 +++----------------
 python/pyspark/streaming/flume.py               |   2 +-
 python/pyspark/streaming/kafka.py               |   2 +-
 python/pyspark/streaming/kinesis.py             |   2 +-
 python/pyspark/streaming/mqtt.py                |   2 +-
 python/pyspark/streaming/tests.py               |  18 ++++---
 sbin/spark-config.sh                            |   2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   4 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala    |   2 +-
 17 files changed, 34 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index dca03ab..790476e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
      (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
      (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
-     (The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
+     (The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
      (Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
      (BSD licence) sbt and sbt-launch-lib.bash
      (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 8f2a3b5..18012ee 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -65,7 +65,7 @@ export PYSPARK_PYTHON
 
 # Add the PySpark classes to the Python path:
 export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
-export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
+export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
 
 # Load the PySpark shell.py script when ./pyspark is used interactively:
 export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/bin/pyspark2.cmd
----------------------------------------------------------------------
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 3c61699..a97d884 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
 )
 
 set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
 
 set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
 set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index fdcb6a7..319a500 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -350,7 +350,7 @@
     <dependency>
       <groupId>net.sf.py4j</groupId>
       <artifactId>py4j</artifactId>
-      <version>0.8.2.1</version>
+      <version>0.9</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index 31e534f..292ac4c 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
     val pythonPath = new ArrayBuffer[String]
     for (sparkHome <- sys.env.get("SPARK_HOME")) {
       pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
-      pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
+      pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
     }
     pythonPath ++= SparkContext.jarOfObject(this)
     pythonPath.mkString(File.pathSeparator)

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/docs/Makefile
----------------------------------------------------------------------
diff --git a/python/docs/Makefile b/python/docs/Makefile
index 8a1324e..4cec74f 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -7,7 +7,7 @@ SPHINXBUILD   = sphinx-build
 PAPER         =
 BUILDDIR      = _build
 
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.8.2.1-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
 
 # User-friendly check for sphinx-build
 ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/lib/py4j-0.8.2.1-src.zip
----------------------------------------------------------------------
diff --git a/python/lib/py4j-0.8.2.1-src.zip b/python/lib/py4j-0.8.2.1-src.zip
deleted file mode 100644
index 5203b84..0000000
Binary files a/python/lib/py4j-0.8.2.1-src.zip and /dev/null differ

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/lib/py4j-0.9-src.zip
----------------------------------------------------------------------
diff --git a/python/lib/py4j-0.9-src.zip b/python/lib/py4j-0.9-src.zip
new file mode 100644
index 0000000..dace2d0
Binary files /dev/null and b/python/lib/py4j-0.9-src.zip differ

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index a8c9ffc..975c754 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -32,48 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
 __all__ = ["StreamingContext"]
 
 
-def _daemonize_callback_server():
-    """
-    Hack Py4J to daemonize callback server
-
-    The thread of callback server has daemon=False, it will block the driver
-    from exiting if it's not shutdown. The following code replace `start()`
-    of CallbackServer with a new version, which set daemon=True for this
-    thread.
-
-    Also, it will update the port number (0) with real port
-    """
-    # TODO: create a patch for Py4J
-    import socket
-    import py4j.java_gateway
-    logger = py4j.java_gateway.logger
-    from py4j.java_gateway import Py4JNetworkError
-    from threading import Thread
-
-    def start(self):
-        """Starts the CallbackServer. This method should be called by the
-        client instead of run()."""
-        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
-                                      1)
-        try:
-            self.server_socket.bind((self.address, self.port))
-            if not self.port:
-                # update port with real port
-                self.port = self.server_socket.getsockname()[1]
-        except Exception as e:
-            msg = 'An error occurred while trying to start the callback server: %s' % e
-            logger.exception(msg)
-            raise Py4JNetworkError(msg)
-
-        # Maybe thread needs to be cleanup up?
-        self.thread = Thread(target=self.run)
-        self.thread.daemon = True
-        self.thread.start()
-
-    py4j.java_gateway.CallbackServer.start = start
-
-
 class StreamingContext(object):
     """
     Main entry point for Spark Streaming functionality. A StreamingContext
@@ -123,10 +81,14 @@ class StreamingContext(object):
 
         # start callback server
         # getattr will fallback to JVM, so we cannot test by hasattr()
-        if "_callback_server" not in gw.__dict__:
-            _daemonize_callback_server()
-            # use random port
-            gw._start_callback_server(0)
+        if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
+            gw.callback_server_parameters.eager_load = True
+            gw.callback_server_parameters.daemonize = True
+            gw.callback_server_parameters.daemonize_connections = True
+            gw.callback_server_parameters.port = 0
+            gw.start_callback_server(gw.callback_server_parameters)
+            cbport = gw._callback_server.server_socket.getsockname()[1]
+            gw._callback_server.port = cbport
             # gateway with real port
             gw._python_proxy_port = gw._callback_server.port
             # get the GatewayServer object in JVM by ID

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index c0cdc50..b3d1905 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -20,7 +20,7 @@ if sys.version >= "3":
     from io import BytesIO
 else:
     from StringIO import StringIO
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
 
 from pyspark.storagelevel import StorageLevel
 from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index 8a814c6..b35bbaf 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
 
 from pyspark.rdd import RDD
 from pyspark.storagelevel import StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/kinesis.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index 34be588..af72c3d 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
 
 from pyspark.serializers import PairDeserializer, NoOpSerializer
 from pyspark.storagelevel import StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/mqtt.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index fa83006..1ce4093 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from py4j.java_gateway import Py4JJavaError
+from py4j.protocol import Py4JJavaError
 
 from pyspark.storagelevel import StorageLevel
 from pyspark.serializers import UTF8Deserializer

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index e4e56ff..4963425 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -61,9 +61,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
     def tearDownClass(cls):
         cls.sc.stop()
         # Clean up in the JVM just in case there has been some issues in Python API
-        jSparkContextOption = SparkContext._jvm.SparkContext.get()
-        if jSparkContextOption.nonEmpty():
-            jSparkContextOption.get().stop()
+        try:
+            jSparkContextOption = SparkContext._jvm.SparkContext.get()
+            if jSparkContextOption.nonEmpty():
+                jSparkContextOption.get().stop()
+        except:
+            pass
 
     def setUp(self):
         self.ssc = StreamingContext(self.sc, self.duration)
@@ -72,9 +75,12 @@ class PySparkStreamingTestCase(unittest.TestCase):
         if self.ssc is not None:
             self.ssc.stop(False)
         # Clean up in the JVM just in case there has been some issues in Python API
-        jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
-        if jStreamingContextOption.nonEmpty():
-            jStreamingContextOption.get().stop(False)
+        try:
+            jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
+            if jStreamingContextOption.nonEmpty():
+                jStreamingContextOption.get().stop(False)
+        except:
+            pass
 
     def wait_for(self, result, n):
         start_time = time.time()

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/sbin/spark-config.sh
----------------------------------------------------------------------
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index b0361d7..e6bf544 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -36,4 +36,4 @@ export SPARK_HOME="${SPARK_PREFIX}"
 export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"
 # Add the PySpark classes to the PYTHONPATH:
 export PYTHONPATH="$SPARK_HOME/python:$PYTHONPATH"
-export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
+export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH"

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 08aecfa..754215d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1008,9 +1008,9 @@ private[spark] class Client(
         val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
         require(pyArchivesFile.exists(),
           "pyspark.zip not found; cannot run pyspark application in YARN mode.")
-        val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
+        val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
         require(py4jFile.exists(),
-          "py4j-0.8.2.1-src.zip not found; cannot run pyspark application in YARN mode.")
+          "py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
         Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
       }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e18b571c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d1cd0c8..6db012a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -153,7 +153,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     // needed locations.
     val sparkHome = sys.props("spark.test.home");
     val pythonPath = Seq(
-        s"$sparkHome/python/lib/py4j-0.8.2.1-src.zip",
+        s"$sparkHome/python/lib/py4j-0.9-src.zip",
         s"$sparkHome/python")
     val extraEnv = Map(
       "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org