You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/10/20 01:18:24 UTC
spark git commit: [SPARK-11114][PYSPARK] add getOrCreate for
SparkContext/SQLContext in Python
Repository: spark
Updated Branches:
refs/heads/master a1413b366 -> 232d7f8d4
[SPARK-11114][PYSPARK] add getOrCreate for SparkContext/SQLContext in Python
Also added SQLContext.newSession()
Author: Davies Liu <da...@databricks.com>
Closes #9122 from davies/py_create.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/232d7f8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/232d7f8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/232d7f8d
Branch: refs/heads/master
Commit: 232d7f8d42950431f1d9be2a6bb3591fb6ea20d6
Parents: a1413b3
Author: Davies Liu <da...@databricks.com>
Authored: Mon Oct 19 16:18:20 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Oct 19 16:18:20 2015 -0700
----------------------------------------------------------------------
python/pyspark/context.py | 16 ++++++++++++++--
python/pyspark/sql/context.py | 27 +++++++++++++++++++++++++++
python/pyspark/sql/tests.py | 14 ++++++++++++++
python/pyspark/tests.py | 4 ++++
4 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 4969d85..afd74d9 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -21,7 +21,7 @@ import os
import shutil
import signal
import sys
-from threading import Lock
+from threading import RLock
from tempfile import NamedTemporaryFile
from pyspark import accumulators
@@ -65,7 +65,7 @@ class SparkContext(object):
_jvm = None
_next_accum_id = 0
_active_spark_context = None
- _lock = Lock()
+ _lock = RLock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
@@ -280,6 +280,18 @@ class SparkContext(object):
"""
self.stop()
+ @classmethod
+ def getOrCreate(cls, conf=None):
+ """
+ Get or instantiate a SparkContext and register it as a singleton object.
+
+ :param conf: SparkConf (optional)
+ """
+ with SparkContext._lock:
+ if SparkContext._active_spark_context is None:
+ SparkContext(conf=conf or SparkConf())
+ return SparkContext._active_spark_context
+
def setLogLevel(self, logLevel):
"""
Control our logLevel. This overrides any user-defined log settings.
http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 89c8c6e..7945365 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -75,6 +75,8 @@ class SQLContext(object):
SQLContext in the JVM, instead we make all calls to this object.
"""
+ _instantiatedContext = None
+
@ignore_unicode_prefix
def __init__(self, sparkContext, sqlContext=None):
"""Creates a new SQLContext.
@@ -99,6 +101,8 @@ class SQLContext(object):
self._scala_SQLContext = sqlContext
_monkey_patch_RDD(self)
install_exception_handler()
+ if SQLContext._instantiatedContext is None:
+ SQLContext._instantiatedContext = self
@property
def _ssql_ctx(self):
@@ -111,6 +115,29 @@ class SQLContext(object):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext
+ @classmethod
+ @since(1.6)
+ def getOrCreate(cls, sc):
+ """
+ Get the existing SQLContext or create a new one with given SparkContext.
+
+ :param sc: SparkContext
+ """
+ if cls._instantiatedContext is None:
+ jsqlContext = sc._jvm.SQLContext.getOrCreate(sc._jsc.sc())
+ cls(sc, jsqlContext)
+ return cls._instantiatedContext
+
+ @since(1.6)
+ def newSession(self):
+ """
+ Returns a new SQLContext as new session, that has separate SQLConf,
+ registered temporary tables and UDFs, but shared SparkContext and
+ table cache.
+ """
+ jsqlContext = self._ssql_ctx.newSession()
+ return self.__class__(self._sc, jsqlContext)
+
@since(1.3)
def setConf(self, key, value):
"""Sets the given Spark SQL configuration property.
http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 645133b..f465e1f 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -174,6 +174,20 @@ class DataTypeTests(unittest.TestCase):
self.assertEqual(dt.fromInternal(0), datetime.date(1970, 1, 1))
+class SQLContextTests(ReusedPySparkTestCase):
+ def test_get_or_create(self):
+ sqlCtx = SQLContext.getOrCreate(self.sc)
+ self.assertTrue(SQLContext.getOrCreate(self.sc) is sqlCtx)
+
+ def test_new_session(self):
+ sqlCtx = SQLContext.getOrCreate(self.sc)
+ sqlCtx.setConf("test_key", "a")
+ sqlCtx2 = sqlCtx.newSession()
+ sqlCtx2.setConf("test_key", "b")
+ self.assertEqual(sqlCtx.getConf("test_key", ""), "a")
+ self.assertEqual(sqlCtx2.getConf("test_key", ""), "b")
+
+
class SQLTests(ReusedPySparkTestCase):
@classmethod
http://git-wip-us.apache.org/repos/asf/spark/blob/232d7f8d/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 63cc87e..3c51809 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1883,6 +1883,10 @@ class ContextTests(unittest.TestCase):
# Regression test for SPARK-1550
self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name"))
+ def test_get_or_create(self):
+ with SparkContext.getOrCreate() as sc:
+ self.assertTrue(SparkContext.getOrCreate() is sc)
+
def test_stop(self):
sc = SparkContext()
self.assertNotEqual(SparkContext._active_spark_context, None)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org