You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/01/31 11:04:56 UTC

spark git commit: [SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defaultSession

Repository: spark
Updated Branches:
  refs/heads/master 161a3f2ae -> 3d0911bbe


[SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defaultSession

## What changes were proposed in this pull request?

In the current PySpark code, Python created `jsparkSession` doesn't add to JVM's defaultSession, this `SparkSession` object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.

```scala
class TestSparkSession extends SparkListener with Logging {
  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    event match {
      case CreateTableEvent(db, table) =>
        val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
        assert(session.isDefined)
        val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table)
        logInfo(s"Table info ${tableInfo}")

      case e =>
        logInfo(s"event $e")

    }
  }
}
```

So here propose to add fresh create `jsparkSession` to `defaultSession`.

## How was this patch tested?

Manual verification.

Author: jerryshao <ss...@hortonworks.com>
Author: hyukjinkwon <gu...@gmail.com>
Author: Saisai Shao <sa...@gmail.com>

Closes #20404 from jerryshao/SPARK-23228.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0911bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0911bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0911bb

Branch: refs/heads/master
Commit: 3d0911bbe47f76c341c090edad3737e88a67e3d7
Parents: 161a3f2
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jan 31 20:04:51 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Wed Jan 31 20:04:51 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/session.py | 10 +++++++++-
 python/pyspark/sql/tests.py   | 28 +++++++++++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d0911bb/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 6c84023..1ed0429 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,7 +213,12 @@ class SparkSession(object):
         self._jsc = self._sc._jsc
         self._jvm = self._sc._jvm
         if jsparkSession is None:
-            jsparkSession = self._jvm.SparkSession(self._jsc.sc())
+            if self._jvm.SparkSession.getDefaultSession().isDefined() \
+                    and not self._jvm.SparkSession.getDefaultSession().get() \
+                        .sparkContext().isStopped():
+                jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
+            else:
+                jsparkSession = self._jvm.SparkSession(self._jsc.sc())
         self._jsparkSession = jsparkSession
         self._jwrapped = self._jsparkSession.sqlContext()
         self._wrapped = SQLContext(self._sc, self, self._jwrapped)
@@ -225,6 +230,7 @@ class SparkSession(object):
         if SparkSession._instantiatedSession is None \
                 or SparkSession._instantiatedSession._sc._jsc is None:
             SparkSession._instantiatedSession = self
+            self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
 
     def _repr_html_(self):
         return """
@@ -759,6 +765,8 @@ class SparkSession(object):
         """Stop the underlying :class:`SparkContext`.
         """
         self._sc.stop()
+        # We should clean the default session up. See SPARK-23228.
+        self._jvm.SparkSession.clearDefaultSession()
         SparkSession._instantiatedSession = None
 
     @since(2.0)

http://git-wip-us.apache.org/repos/asf/spark/blob/3d0911bb/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index dc80870..dc26b96 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -69,7 +69,7 @@ from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
 from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings
 from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
 from pyspark.sql.types import _merge_type
-from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests
+from pyspark.tests import QuietTest, ReusedPySparkTestCase, PySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2, lit
 from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
@@ -2925,6 +2925,32 @@ class SQLTests2(ReusedSQLTestCase):
             sc.stop()
 
 
+class SparkSessionTests(PySparkTestCase):
+
+    # This test is separate because it's closely related with session's start and stop.
+    # See SPARK-23228.
+    def test_set_jvm_default_session(self):
+        spark = SparkSession.builder.getOrCreate()
+        try:
+            self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isDefined())
+        finally:
+            spark.stop()
+            self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isEmpty())
+
+    def test_jvm_default_session_already_set(self):
+        # Here, we assume there is the default session already set in JVM.
+        jsession = self.sc._jvm.SparkSession(self.sc._jsc.sc())
+        self.sc._jvm.SparkSession.setDefaultSession(jsession)
+
+        spark = SparkSession.builder.getOrCreate()
+        try:
+            self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isDefined())
+            # The session should be the same with the exiting one.
+            self.assertTrue(jsession.equals(spark._jvm.SparkSession.getDefaultSession().get()))
+        finally:
+            spark.stop()
+
+
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
         if SparkSession._instantiatedSession is not None:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org