You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2017/11/17 21:01:40 UTC

systemml git commit: [SYSTEMML-1658] Support nested jvm_stdout and also enable routing of DML print statements from JVM back to Python

Repository: systemml
Updated Branches:
  refs/heads/master f0e369407 -> 43c28cf79


[SYSTEMML-1658] Support nested jvm_stdout and also enable routing of DML print statements from JVM back to Python

- Since Python API is often used in notebook environment, jvm_stdout
needs to be supported as default functionality to simplify SystemML's
usage. The methods that output JVM's stdout (such as mlcontext's execute
or mllearns fit/transform/load/save methods) are encapsulated with
jvm_stdout.

As an example, instead of:
from systemml import jvm_stdout
with jvm_stdout():
	ml.execute(dml(script))

one now just have to do:
ml.execute(dml(script))


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/43c28cf7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/43c28cf7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/43c28cf7

Branch: refs/heads/master
Commit: 43c28cf79ebec65b194c2bf78b18d6db354f4e11
Parents: f0e3694
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Fri Nov 17 12:58:23 2017 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Fri Nov 17 13:00:47 2017 -0800

----------------------------------------------------------------------
 src/main/python/systemml/classloader.py        | 92 ++++++++++++++++++++-
 src/main/python/systemml/mlcontext.py          | 75 ++---------------
 src/main/python/systemml/mllearn/estimators.py | 70 +++++++++++++---
 3 files changed, 160 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/classloader.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/classloader.py b/src/main/python/systemml/classloader.py
index 015a3dc..fb0a731 100644
--- a/src/main/python/systemml/classloader.py
+++ b/src/main/python/systemml/classloader.py
@@ -19,11 +19,12 @@
 #
 #-------------------------------------------------------------
 
-__all__ = ['createJavaObject']
+__all__ = ['createJavaObject', 'jvm_stdout', 'default_jvm_stdout', 'default_jvm_stdout_parallel_flush', 'set_default_jvm_stdout', 'get_spark_context' ]
 
 import os
 import numpy as np
 import pandas as pd
+import threading, time
 
 try:
     import py4j.java_gateway
@@ -33,6 +34,95 @@ try:
 except ImportError:
     raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')
 
+_loadedSystemML = False
+def get_spark_context():
+    """
+    Internal method to get already initialized SparkContext.  Developers should always use
+    get_spark_context() instead of SparkContext._active_spark_context to ensure SystemML loaded.
+
+    Returns
+    -------
+    sc: SparkContext
+        SparkContext
+    """
+    if SparkContext._active_spark_context is not None:
+        sc = SparkContext._active_spark_context
+        global _loadedSystemML
+        if not _loadedSystemML:
+            createJavaObject(sc, 'dummy')
+            _loadedSystemML = True
+        return sc
+    else:
+        raise Exception('Expected spark context to be created.')
+        
+_in_jvm_stdout = False
+default_jvm_stdout = True
+default_jvm_stdout_parallel_flush = False
+def set_default_jvm_stdout(enable, parallel_flush=False):
+    """
+    This is useful utility method to get the output of the driver JVM from within a Jupyter notebook
+
+    Parameters
+    ----------
+    enable: boolean
+        Should flush the stdout by default when mlcontext.execute is invoked
+        
+    parallel_flush: boolean
+        Should flush the stdout in parallel
+    """
+    global default_jvm_stdout, default_jvm_stdout_parallel_flush
+    default_jvm_stdout = enable
+    default_jvm_stdout_parallel_flush = parallel_flush
+    
+# This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
+# Example usage:
+# with jvm_stdout():
+#    ml.execute(script)
+class jvm_stdout(object):
+    """
+    This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
+
+    Parameters
+    ----------
+    parallel_flush: boolean
+        Should flush the stdout in parallel
+    """
+    def __init__(self, parallel_flush=False):
+        self.util = get_spark_context()._jvm.org.apache.sysml.api.ml.Utils()
+        self.parallel_flush = parallel_flush
+        self.t = threading.Thread(target=self.flush_stdout)
+        self.stop = False
+        
+    def flush_stdout(self):
+        while not self.stop: 
+            time.sleep(1) # flush stdout every 1 second
+            str = self.util.flushStdOut()
+            if str != '':
+                str = str[:-1] if str.endswith('\n') else str
+                print(str)
+    
+    def __enter__(self):
+        global _in_jvm_stdout
+        if _in_jvm_stdout:
+            # Allow for nested jvm_stdout    
+            self.donotRedirect = True
+        else:
+            self.donotRedirect = False
+            self.util.startRedirectStdOut()
+            if self.parallel_flush:
+                self.t.start()
+            _in_jvm_stdout = True
+
+    def __exit__(self, *args):
+        global _in_jvm_stdout
+        if not self.donotRedirect:
+            if self.parallel_flush:
+                self.stop = True
+                self.t.join()
+            print(self.util.stopRedirectStdOut())
+            _in_jvm_stdout = False
+
+
 _initializedSparkSession = False
 def _createJavaObject(sc, obj_type):
     # -----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/mlcontext.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mlcontext.py b/src/main/python/systemml/mlcontext.py
index 28f2d8c..c945b2b 100644
--- a/src/main/python/systemml/mlcontext.py
+++ b/src/main/python/systemml/mlcontext.py
@@ -22,7 +22,7 @@
 # Methods to create Script object
 script_factory_methods = [ 'dml', 'pydml', 'dmlFromResource', 'pydmlFromResource', 'dmlFromFile', 'pydmlFromFile', 'dmlFromUrl', 'pydmlFromUrl' ]
 # Utility methods
-util_methods = [ 'jvm_stdout', '_java2py',  'getHopDAG' ]
+util_methods = [ '_java2py',  'getHopDAG' ]
 __all__ = ['MLResults', 'MLContext', 'Script', 'Matrix' ] + script_factory_methods + util_methods
 
 import os
@@ -43,68 +43,6 @@ except ImportError:
 from .converters import *
 from .classloader import *
 
-_loadedSystemML = False
-def _get_spark_context():
-    """
-    Internal method to get already initialized SparkContext.  Developers should always use
-    _get_spark_context() instead of SparkContext._active_spark_context to ensure SystemML loaded.
-
-    Returns
-    -------
-    sc: SparkContext
-        SparkContext
-    """
-    if SparkContext._active_spark_context is not None:
-        sc = SparkContext._active_spark_context
-        global _loadedSystemML
-        if not _loadedSystemML:
-            createJavaObject(sc, 'dummy')
-            _loadedSystemML = True
-        return sc
-    else:
-        raise Exception('Expected spark context to be created.')
-
-
-
-# This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
-# Example usage:
-# with jvm_stdout():
-#    ml.execute(script)
-class jvm_stdout(object):
-    """
-    This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
-
-    Parameters
-    ----------
-    parallel_flush: boolean
-        Should flush the stdout in parallel
-    """
-    def __init__(self, parallel_flush=False):
-        self.util = _get_spark_context()._jvm.org.apache.sysml.api.ml.Utils()
-        self.parallel_flush = parallel_flush
-        self.t = threading.Thread(target=self.flush_stdout)
-        self.stop = False
-        
-    def flush_stdout(self):
-        while not self.stop: 
-            time.sleep(1) # flush stdout every 1 second
-            str = self.util.flushStdOut()
-            if str != '':
-                str = str[:-1] if str.endswith('\n') else str
-                print(str)
-    
-    def __enter__(self):
-        self.util.startRedirectStdOut()
-        if self.parallel_flush:
-            self.t.start()
-
-    def __exit__(self, *args):
-        if self.parallel_flush:
-            self.stop = True
-            self.t.join()
-        print(self.util.stopRedirectStdOut())
-        
-
 def getHopDAG(ml, script, lines=None, conf=None, apply_rewrites=True, with_subgraph=False):
     """
     Compile a DML / PyDML script.
@@ -139,7 +77,7 @@ def getHopDAG(ml, script, lines=None, conf=None, apply_rewrites=True, with_subgr
     scriptString = script.scriptString
     script_java = script.script_java
     lines = [ int(x) for x in lines ] if lines is not None else [int(-1)]
-    sc = _get_spark_context()
+    sc = get_spark_context()
     if conf is not None:
         hopDAG = sc._jvm.org.apache.sysml.api.mlcontext.MLContextUtil.getHopDAG(ml._ml, script_java, lines, conf._jconf, apply_rewrites, with_subgraph)
     else:
@@ -414,7 +352,7 @@ class Script(object):
         Optional script format, either "auto" or "url" or "file" or "resource" or "string"
     """
     def __init__(self, scriptString, scriptType="dml", isResource=False, scriptFormat="auto"):
-        self.sc = _get_spark_context()
+        self.sc = get_spark_context()
         self.scriptString = scriptString
         self.scriptType = scriptType
         self.isResource = isResource
@@ -721,7 +659,12 @@ class MLContext(object):
             raise ValueError("Expected script to be an instance of Script")
         scriptString = script.scriptString
         script_java = script.script_java
-        return MLResults(self._ml.execute(script_java), self._sc)
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
+        if default_jvm_stdout:
+            with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                return MLResults(self._ml.execute(script_java), self._sc)
+        else:
+            return MLResults(self._ml.execute(script_java), self._sc)
 
     def setStatistics(self, statistics):
         """

http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index 50412aa..7e6104f 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -148,8 +148,13 @@ class BaseSystemMLEstimator(Estimator):
         return self
     
     def _fit_df(self):
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         try:
-            self.model = self.estimator.fit(self.X._jdf)
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.model = self.estimator.fit(self.X._jdf)
+            else:
+                self.model = self.estimator.fit(self.X._jdf)
         except Py4JError:
             traceback.print_exc()
     
@@ -160,12 +165,17 @@ class BaseSystemMLEstimator(Estimator):
         return self
     
     def _fit_numpy(self):
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         try:
             if type(self.y) == np.ndarray and len(self.y.shape) == 1:
                 # Since we know that mllearn always needs a column vector
                 self.y = np.matrix(self.y).T
             y_mb = convertToMatrixBlock(self.sc, self.y)
-            self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
+            else:
+                self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
         except Py4JError:
             traceback.print_exc()
                     
@@ -284,6 +294,7 @@ class BaseSystemMLEstimator(Estimator):
         ----------
         X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
         """
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         if hasattr(X, '_jdf'):
             return self.predict(X)
         elif self.transferUsingDF:
@@ -295,7 +306,11 @@ class BaseSystemMLEstimator(Estimator):
             pass
         try:
             jX = self._convertPythonXToJavaObject(X)
-            return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
+            else:
+                return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
         except Py4JError:
             traceback.print_exc()
     
@@ -308,6 +323,7 @@ class BaseSystemMLEstimator(Estimator):
         ----------
         X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
         """
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         try:
             if self.estimator is not None and self.model is not None:
                 self.estimator.copyProperties(self.model)
@@ -315,7 +331,11 @@ class BaseSystemMLEstimator(Estimator):
             pass
         try:
             jX = self._convertPythonXToJavaObject(X)
-            ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
+            else:
+                ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
             return self.decode(ret) if isinstance(X, SUPPORTED_TYPES) else ret
         except Py4JError:
             traceback.print_exc()
@@ -389,7 +409,12 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
         eager: load the model eagerly. This flag should be only used for debugging purposes. (default: False)
         """
         self.weights = weights
-        self.model.load(self.sc._jsc, weights, sep, eager)
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
+        if default_jvm_stdout:
+            with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                self.model.load(self.sc._jsc, weights, sep, eager)
+        else:
+            self.model.load(self.sc._jsc, weights, sep, eager)
         self.loadLabels(weights + '/labels.txt')
         
     def save(self, outputDir, format='binary', sep='/'):
@@ -402,8 +427,13 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
         format: optional format (default: 'binary')
         sep: seperator to use (default: '/')
         """
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         if self.model != None:
-            self.model.save(self.sc._jsc, outputDir, format, sep)
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.model.save(self.sc._jsc, outputDir, format, sep)
+            else:
+                self.model.save(self.sc._jsc, outputDir, format, sep)
 
             labelMapping = None
             if hasattr(self, 'le') and self.le is not None:
@@ -449,7 +479,12 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator):
         eager: load the model eagerly (default: False)
         """
         self.weights = weights
-        self.model.load(self.sc._jsc, weights, sep, eager)
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
+        if default_jvm_stdout:
+            with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                self.model.load(self.sc._jsc, weights, sep, eager)
+        else:
+            self.model.load(self.sc._jsc, weights, sep, eager)
 
     def save(self, outputDir, format='binary', sep='/'):
         """
@@ -461,8 +496,13 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator):
         format: optional format (default: 'binary')
         sep: seperator to use (default: '/')
         """
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         if self.model != None:
-            self.model.save(outputDir, format, sep)
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.model.save(outputDir, format, sep)
+            else:
+                self.model.save(outputDir, format, sep)
         else:
             raise Exception('Cannot save as you need to train the model first using fit')
         return self
@@ -789,10 +829,15 @@ class Caffe2DML(BaseSystemMLClassifier):
         ignore_weights: names of layers to not read from the weights directory (list of string, default:None)
         eager: load the model eagerly (default: False)
         """
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         self.weights = weights
         self.estimator.setInput("$weights", str(weights))
         self.model = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator)
-        self.model.load(self.sc._jsc, weights, sep, eager)
+        if default_jvm_stdout:
+            with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                self.model.load(self.sc._jsc, weights, sep, eager)
+        else:
+            self.model.load(self.sc._jsc, weights, sep, eager)
         self.loadLabels(weights + '/labels.txt')
         if ignore_weights is not None:
             self.estimator.setWeightsToIgnore(ignore_weights)
@@ -821,8 +866,13 @@ class Caffe2DML(BaseSystemMLClassifier):
         Print the summary of the network
         """
         import pyspark
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
         if type(self.sparkSession) == pyspark.sql.session.SparkSession:
-            self.estimator.summary(self.sparkSession._jsparkSession)
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.estimator.summary(self.sparkSession._jsparkSession)
+            else:
+                self.estimator.summary(self.sparkSession._jsparkSession)
         else:
             raise TypeError('Please use spark session of type pyspark.sql.session.SparkSession in the constructor')