You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2017/09/05 17:04:14 UTC

systemml git commit: [MINOR] Enable systemml to be imported in the pyspark workers

Repository: systemml
Updated Branches:
  refs/heads/master 0ba9e74b9 -> b34079a28


[MINOR] Enable systemml to be imported in the pyspark workers

- Moved the race condition avoidance logic to  classloader instead of
  import level. This avoids creation of dataframe in pyspark workers.

Closes #652.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b34079a2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b34079a2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b34079a2

Branch: refs/heads/master
Commit: b34079a283a1859ed23de77f4ff0e50985b57dd3
Parents: 0ba9e74
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Tue Sep 5 09:59:59 2017 -0700
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Tue Sep 5 10:01:55 2017 -0700

----------------------------------------------------------------------
 src/main/python/systemml/classloader.py | 14 +++++++++++++-
 src/main/python/systemml/mlcontext.py   |  4 ----
 2 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b34079a2/src/main/python/systemml/classloader.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/classloader.py b/src/main/python/systemml/classloader.py
index 8738dc5..015a3dc 100644
--- a/src/main/python/systemml/classloader.py
+++ b/src/main/python/systemml/classloader.py
@@ -22,15 +22,27 @@
 __all__ = ['createJavaObject']
 
 import os
+import numpy as np
+import pandas as pd
 
 try:
     import py4j.java_gateway
     from py4j.java_gateway import JavaObject
     from pyspark import SparkContext
+    from pyspark.sql import SparkSession
 except ImportError:
     raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')
 
+_initializedSparkSession = False
 def _createJavaObject(sc, obj_type):
+    # -----------------------------------------------------------------------------------
+    # Avoids race condition between locking of metastore_db of Scala SparkSession and PySpark SparkSession.
+    # This is done at toDF() rather than import level to avoid creation of SparkSession in worker processes.
+    global _initializedSparkSession
+    if not _initializedSparkSession:
+        _initializedSparkSession = True
+        SparkSession.builder.getOrCreate().createDataFrame(pd.DataFrame(np.array([[1,2],[3,4]])))
+    # -----------------------------------------------------------------------------------
     if obj_type == 'mlcontext':
         return sc._jvm.org.apache.sysml.api.mlcontext.MLContext(sc._jsc)
     elif obj_type == 'dummy':
@@ -89,4 +101,4 @@ def createJavaObject(sc, obj_type):
         jar_file_name = _getJarFileName(sc, '-extra')
         x = _getLoaderInstance(sc, jar_file_name, 'org.apache.sysml.api.dl.Caffe2DMLLoader', hint + 'systemml-*-extra.jar')
         x.loadCaffe2DML(jar_file_name)
-        return ret
\ No newline at end of file
+        return ret

http://git-wip-us.apache.org/repos/asf/systemml/blob/b34079a2/src/main/python/systemml/mlcontext.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mlcontext.py b/src/main/python/systemml/mlcontext.py
index 60705c5..4a555f7 100644
--- a/src/main/python/systemml/mlcontext.py
+++ b/src/main/python/systemml/mlcontext.py
@@ -36,11 +36,7 @@ try:
     from pyspark import SparkContext
     from pyspark.conf import SparkConf
     import pyspark.mllib.common
-    # -----------------------------------------------------------------------------------
-    # Avoids race condition between locking of metastore_db of Scala SparkSession and PySpark SparkSession
     from pyspark.sql import SparkSession
-    SparkSession.builder.getOrCreate().createDataFrame(pd.DataFrame(np.array([[1,2],[3,4]])))
-    # -----------------------------------------------------------------------------------
 except ImportError:
     raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')