You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2017/11/17 21:01:40 UTC
systemml git commit: [SYSTEMML-1658] Support nested jvm_stdout and
also enable routing of DML print statements from JVM back to Python
Repository: systemml
Updated Branches:
refs/heads/master f0e369407 -> 43c28cf79
[SYSTEMML-1658] Support nested jvm_stdout and also enable routing of DML print statements from JVM back to Python
- Since Python API is often used in notebook environment, jvm_stdout
needs to be supported as default functionality to simplify SystemML's
usage. The methods that output JVM's stdout (such as mlcontext's execute
or mllearns fit/transform/load/save methods) are encapsulated with
jvm_stdout.
As an example, instead of:
from systemml import jvm_stdout
with jvm_stdout():
ml.execute(dml(script))
one now just have to do:
ml.execute(dml(script))
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/43c28cf7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/43c28cf7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/43c28cf7
Branch: refs/heads/master
Commit: 43c28cf79ebec65b194c2bf78b18d6db354f4e11
Parents: f0e3694
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Fri Nov 17 12:58:23 2017 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Fri Nov 17 13:00:47 2017 -0800
----------------------------------------------------------------------
src/main/python/systemml/classloader.py | 92 ++++++++++++++++++++-
src/main/python/systemml/mlcontext.py | 75 ++---------------
src/main/python/systemml/mllearn/estimators.py | 70 +++++++++++++---
3 files changed, 160 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/classloader.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/classloader.py b/src/main/python/systemml/classloader.py
index 015a3dc..fb0a731 100644
--- a/src/main/python/systemml/classloader.py
+++ b/src/main/python/systemml/classloader.py
@@ -19,11 +19,12 @@
#
#-------------------------------------------------------------
-__all__ = ['createJavaObject']
+__all__ = ['createJavaObject', 'jvm_stdout', 'default_jvm_stdout', 'default_jvm_stdout_parallel_flush', 'set_default_jvm_stdout', 'get_spark_context' ]
import os
import numpy as np
import pandas as pd
+import threading, time
try:
import py4j.java_gateway
@@ -33,6 +34,95 @@ try:
except ImportError:
raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.')
+_loadedSystemML = False
+def get_spark_context():
+ """
+ Internal method to get already initialized SparkContext. Developers should always use
+ get_spark_context() instead of SparkContext._active_spark_context to ensure SystemML loaded.
+
+ Returns
+ -------
+ sc: SparkContext
+ SparkContext
+ """
+ if SparkContext._active_spark_context is not None:
+ sc = SparkContext._active_spark_context
+ global _loadedSystemML
+ if not _loadedSystemML:
+ createJavaObject(sc, 'dummy')
+ _loadedSystemML = True
+ return sc
+ else:
+ raise Exception('Expected spark context to be created.')
+
+_in_jvm_stdout = False
+default_jvm_stdout = True
+default_jvm_stdout_parallel_flush = False
+def set_default_jvm_stdout(enable, parallel_flush=False):
+ """
+ This is useful utility method to get the output of the driver JVM from within a Jupyter notebook
+
+ Parameters
+ ----------
+ enable: boolean
+ Should flush the stdout by default when mlcontext.execute is invoked
+
+ parallel_flush: boolean
+ Should flush the stdout in parallel
+ """
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
+ default_jvm_stdout = enable
+ default_jvm_stdout_parallel_flush = parallel_flush
+
+# This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
+# Example usage:
+# with jvm_stdout():
+# ml.execute(script)
+class jvm_stdout(object):
+ """
+ This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
+
+ Parameters
+ ----------
+ parallel_flush: boolean
+ Should flush the stdout in parallel
+ """
+ def __init__(self, parallel_flush=False):
+ self.util = get_spark_context()._jvm.org.apache.sysml.api.ml.Utils()
+ self.parallel_flush = parallel_flush
+ self.t = threading.Thread(target=self.flush_stdout)
+ self.stop = False
+
+ def flush_stdout(self):
+ while not self.stop:
+ time.sleep(1) # flush stdout every 1 second
+ str = self.util.flushStdOut()
+ if str != '':
+ str = str[:-1] if str.endswith('\n') else str
+ print(str)
+
+ def __enter__(self):
+ global _in_jvm_stdout
+ if _in_jvm_stdout:
+ # Allow for nested jvm_stdout
+ self.donotRedirect = True
+ else:
+ self.donotRedirect = False
+ self.util.startRedirectStdOut()
+ if self.parallel_flush:
+ self.t.start()
+ _in_jvm_stdout = True
+
+ def __exit__(self, *args):
+ global _in_jvm_stdout
+ if not self.donotRedirect:
+ if self.parallel_flush:
+ self.stop = True
+ self.t.join()
+ print(self.util.stopRedirectStdOut())
+ _in_jvm_stdout = False
+
+
_initializedSparkSession = False
def _createJavaObject(sc, obj_type):
# -----------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/mlcontext.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mlcontext.py b/src/main/python/systemml/mlcontext.py
index 28f2d8c..c945b2b 100644
--- a/src/main/python/systemml/mlcontext.py
+++ b/src/main/python/systemml/mlcontext.py
@@ -22,7 +22,7 @@
# Methods to create Script object
script_factory_methods = [ 'dml', 'pydml', 'dmlFromResource', 'pydmlFromResource', 'dmlFromFile', 'pydmlFromFile', 'dmlFromUrl', 'pydmlFromUrl' ]
# Utility methods
-util_methods = [ 'jvm_stdout', '_java2py', 'getHopDAG' ]
+util_methods = [ '_java2py', 'getHopDAG' ]
__all__ = ['MLResults', 'MLContext', 'Script', 'Matrix' ] + script_factory_methods + util_methods
import os
@@ -43,68 +43,6 @@ except ImportError:
from .converters import *
from .classloader import *
-_loadedSystemML = False
-def _get_spark_context():
- """
- Internal method to get already initialized SparkContext. Developers should always use
- _get_spark_context() instead of SparkContext._active_spark_context to ensure SystemML loaded.
-
- Returns
- -------
- sc: SparkContext
- SparkContext
- """
- if SparkContext._active_spark_context is not None:
- sc = SparkContext._active_spark_context
- global _loadedSystemML
- if not _loadedSystemML:
- createJavaObject(sc, 'dummy')
- _loadedSystemML = True
- return sc
- else:
- raise Exception('Expected spark context to be created.')
-
-
-
-# This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
-# Example usage:
-# with jvm_stdout():
-# ml.execute(script)
-class jvm_stdout(object):
- """
- This is useful utility class to get the output of the driver JVM from within a Jupyter notebook
-
- Parameters
- ----------
- parallel_flush: boolean
- Should flush the stdout in parallel
- """
- def __init__(self, parallel_flush=False):
- self.util = _get_spark_context()._jvm.org.apache.sysml.api.ml.Utils()
- self.parallel_flush = parallel_flush
- self.t = threading.Thread(target=self.flush_stdout)
- self.stop = False
-
- def flush_stdout(self):
- while not self.stop:
- time.sleep(1) # flush stdout every 1 second
- str = self.util.flushStdOut()
- if str != '':
- str = str[:-1] if str.endswith('\n') else str
- print(str)
-
- def __enter__(self):
- self.util.startRedirectStdOut()
- if self.parallel_flush:
- self.t.start()
-
- def __exit__(self, *args):
- if self.parallel_flush:
- self.stop = True
- self.t.join()
- print(self.util.stopRedirectStdOut())
-
-
def getHopDAG(ml, script, lines=None, conf=None, apply_rewrites=True, with_subgraph=False):
"""
Compile a DML / PyDML script.
@@ -139,7 +77,7 @@ def getHopDAG(ml, script, lines=None, conf=None, apply_rewrites=True, with_subgr
scriptString = script.scriptString
script_java = script.script_java
lines = [ int(x) for x in lines ] if lines is not None else [int(-1)]
- sc = _get_spark_context()
+ sc = get_spark_context()
if conf is not None:
hopDAG = sc._jvm.org.apache.sysml.api.mlcontext.MLContextUtil.getHopDAG(ml._ml, script_java, lines, conf._jconf, apply_rewrites, with_subgraph)
else:
@@ -414,7 +352,7 @@ class Script(object):
Optional script format, either "auto" or "url" or "file" or "resource" or "string"
"""
def __init__(self, scriptString, scriptType="dml", isResource=False, scriptFormat="auto"):
- self.sc = _get_spark_context()
+ self.sc = get_spark_context()
self.scriptString = scriptString
self.scriptType = scriptType
self.isResource = isResource
@@ -721,7 +659,12 @@ class MLContext(object):
raise ValueError("Expected script to be an instance of Script")
scriptString = script.scriptString
script_java = script.script_java
- return MLResults(self._ml.execute(script_java), self._sc)
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ return MLResults(self._ml.execute(script_java), self._sc)
+ else:
+ return MLResults(self._ml.execute(script_java), self._sc)
def setStatistics(self, statistics):
"""
http://git-wip-us.apache.org/repos/asf/systemml/blob/43c28cf7/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index 50412aa..7e6104f 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -148,8 +148,13 @@ class BaseSystemMLEstimator(Estimator):
return self
def _fit_df(self):
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
try:
- self.model = self.estimator.fit(self.X._jdf)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model = self.estimator.fit(self.X._jdf)
+ else:
+ self.model = self.estimator.fit(self.X._jdf)
except Py4JError:
traceback.print_exc()
@@ -160,12 +165,17 @@ class BaseSystemMLEstimator(Estimator):
return self
def _fit_numpy(self):
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
try:
if type(self.y) == np.ndarray and len(self.y.shape) == 1:
# Since we know that mllearn always needs a column vector
self.y = np.matrix(self.y).T
y_mb = convertToMatrixBlock(self.sc, self.y)
- self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
+ else:
+ self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), y_mb)
except Py4JError:
traceback.print_exc()
@@ -284,6 +294,7 @@ class BaseSystemMLEstimator(Estimator):
----------
X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
"""
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
if hasattr(X, '_jdf'):
return self.predict(X)
elif self.transferUsingDF:
@@ -295,7 +306,11 @@ class BaseSystemMLEstimator(Estimator):
pass
try:
jX = self._convertPythonXToJavaObject(X)
- return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
+ else:
+ return self._convertJavaOutputToPythonObject(X, self.model.transform_probability(jX))
except Py4JError:
traceback.print_exc()
@@ -308,6 +323,7 @@ class BaseSystemMLEstimator(Estimator):
----------
X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
"""
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
try:
if self.estimator is not None and self.model is not None:
self.estimator.copyProperties(self.model)
@@ -315,7 +331,11 @@ class BaseSystemMLEstimator(Estimator):
pass
try:
jX = self._convertPythonXToJavaObject(X)
- ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
+ else:
+ ret = self._convertJavaOutputToPythonObject(X, self.model.transform(jX))
return self.decode(ret) if isinstance(X, SUPPORTED_TYPES) else ret
except Py4JError:
traceback.print_exc()
@@ -389,7 +409,12 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
eager: load the model eagerly. This flag should be only used for debugging purposes. (default: False)
"""
self.weights = weights
- self.model.load(self.sc._jsc, weights, sep, eager)
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model.load(self.sc._jsc, weights, sep, eager)
+ else:
+ self.model.load(self.sc._jsc, weights, sep, eager)
self.loadLabels(weights + '/labels.txt')
def save(self, outputDir, format='binary', sep='/'):
@@ -402,8 +427,13 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator):
format: optional format (default: 'binary')
sep: seperator to use (default: '/')
"""
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
if self.model != None:
- self.model.save(self.sc._jsc, outputDir, format, sep)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model.save(self.sc._jsc, outputDir, format, sep)
+ else:
+ self.model.save(self.sc._jsc, outputDir, format, sep)
labelMapping = None
if hasattr(self, 'le') and self.le is not None:
@@ -449,7 +479,12 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator):
eager: load the model eagerly (default: False)
"""
self.weights = weights
- self.model.load(self.sc._jsc, weights, sep, eager)
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model.load(self.sc._jsc, weights, sep, eager)
+ else:
+ self.model.load(self.sc._jsc, weights, sep, eager)
def save(self, outputDir, format='binary', sep='/'):
"""
@@ -461,8 +496,13 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator):
format: optional format (default: 'binary')
sep: seperator to use (default: '/')
"""
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
if self.model != None:
- self.model.save(outputDir, format, sep)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model.save(outputDir, format, sep)
+ else:
+ self.model.save(outputDir, format, sep)
else:
raise Exception('Cannot save as you need to train the model first using fit')
return self
@@ -789,10 +829,15 @@ class Caffe2DML(BaseSystemMLClassifier):
ignore_weights: names of layers to not read from the weights directory (list of string, default:None)
eager: load the model eagerly (default: False)
"""
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
self.weights = weights
self.estimator.setInput("$weights", str(weights))
self.model = self.sc._jvm.org.apache.sysml.api.dl.Caffe2DMLModel(self.estimator)
- self.model.load(self.sc._jsc, weights, sep, eager)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model.load(self.sc._jsc, weights, sep, eager)
+ else:
+ self.model.load(self.sc._jsc, weights, sep, eager)
self.loadLabels(weights + '/labels.txt')
if ignore_weights is not None:
self.estimator.setWeightsToIgnore(ignore_weights)
@@ -821,8 +866,13 @@ class Caffe2DML(BaseSystemMLClassifier):
Print the summary of the network
"""
import pyspark
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
if type(self.sparkSession) == pyspark.sql.session.SparkSession:
- self.estimator.summary(self.sparkSession._jsparkSession)
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.estimator.summary(self.sparkSession._jsparkSession)
+ else:
+ self.estimator.summary(self.sparkSession._jsparkSession)
else:
raise TypeError('Please use spark session of type pyspark.sql.session.SparkSession in the constructor')