You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/02/27 21:26:21 UTC
systemml git commit: [SYSTEMML-445] Allow users to pass the file
paths to the binary blocked, csv and ijv datasets to mllearn classes.
Repository: systemml
Updated Branches:
refs/heads/master 54a11eed3 -> 8ffa3d158
[SYSTEMML-445] Allow users to pass the file paths to the binary blocked, csv and ijv datasets to mllearn classes.
- This allows the advance users who already have data materialized in
binary blocked formats to avoid conversion overhead.
- Also, this facility is useful for benchmarking the performance of
Keras2DML and Caffe2DML.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8ffa3d15
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8ffa3d15
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8ffa3d15
Branch: refs/heads/master
Commit: 8ffa3d158fe97e0871bcd4b77fa27504b8b85502
Parents: 54a11ee
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Tue Feb 27 13:21:32 2018 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Tue Feb 27 13:21:32 2018 -0800
----------------------------------------------------------------------
src/main/python/systemml/mllearn/estimators.py | 28 +++++++++--
.../org/apache/sysml/api/dl/Caffe2DML.scala | 22 +++++++++
.../sysml/api/ml/BaseSystemMLClassifier.scala | 49 ++++++++++++++++++++
.../sysml/api/ml/BaseSystemMLRegressor.scala | 23 +++++++++
.../apache/sysml/api/ml/LinearRegression.scala | 9 +++-
.../sysml/api/ml/LogisticRegression.scala | 7 +++
.../org/apache/sysml/api/ml/NaiveBayes.scala | 7 +++
.../scala/org/apache/sysml/api/ml/SVM.scala | 7 +++
8 files changed, 145 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index 3f11d3f..de8aeb9 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -186,7 +186,19 @@ class BaseSystemMLEstimator(Estimator):
self.X = None
self.y = None
return self
-
+
+ def fit_file(self, X_file, y_file):
+ global default_jvm_stdout, default_jvm_stdout_parallel_flush
+ try:
+ if default_jvm_stdout:
+ with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+ self.model = self.estimator.fit(X_file, y_file)
+ else:
+ self.model = self.estimator.fit(X_file, y_file)
+ except Py4JError:
+ traceback.print_exc()
+ return self
+
# Returns a model after calling fit(df) on Estimator object on JVM
def _fit(self, X):
"""
@@ -207,12 +219,14 @@ class BaseSystemMLEstimator(Estimator):
Parameters
----------
- X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix
- y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix
+ X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, Spark DataFrame, file path
+ y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, file path
"""
if y is None:
return self._fit(X)
- elif y is not None and isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES):
+ elif isinstance(X, str) and isinstance(y, str):
+ return self.fit_file(X, y)
+ elif isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES):
# Donot encode if y is a numpy matrix => useful for segmentation
skipEncodingY = len(y.shape) == 2 and y.shape[0] != 1 and y.shape[1] != 1
y = y if skipEncodingY else self.encode(y)
@@ -307,6 +321,8 @@ class BaseSystemMLEstimator(Estimator):
except AttributeError:
pass
try:
+ if isinstance(X, str):
+ return self.model.transform_probability(X)
jX = self._convertPythonXToJavaObject(X)
if default_jvm_stdout:
with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
@@ -323,7 +339,7 @@ class BaseSystemMLEstimator(Estimator):
Parameters
----------
- X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
+ X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame or file path
"""
global default_jvm_stdout, default_jvm_stdout_parallel_flush
try:
@@ -332,6 +348,8 @@ class BaseSystemMLEstimator(Estimator):
except AttributeError:
pass
try:
+ if isinstance(X, str):
+ return self.model.transform(X)
jX = self._convertPythonXToJavaObject(X)
if default_jvm_stdout:
with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
index da72403..26e554f 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
@@ -206,6 +206,10 @@ class Caffe2DML(val sc: SparkContext,
val that = new Caffe2DML(sc, solverParam, solver, net, lrPolicy, numChannels, height, width)
copyValues(that, extra)
}
+ def fit(X_file: String, y_file: String): Caffe2DMLModel = {
+ mloutput = baseFit(X_file, y_file, sc)
+ new Caffe2DMLModel(this)
+ }
// Note: will update the y_mb as this will be called by Python mllearn
def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = {
mloutput = baseFit(X_mb, y_mb, sc)
@@ -822,6 +826,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C
def baseEstimator(): BaseSystemMLEstimator = estimator
// Prediction
+ def transform(X_file: String): String =
+ if (estimator.isClassification) {
+ Caffe2DML.LOG.debug("Prediction assuming classification")
+ baseTransform(X_file, sc, "Prob")
+ } else {
+ Caffe2DML.LOG.debug("Prediction assuming segmentation")
+ val outShape = estimator.getOutputShapeOfLastLayer
+ baseTransform(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
+ }
def transform(X: MatrixBlock): MatrixBlock =
if (estimator.isClassification) {
Caffe2DML.LOG.debug("Prediction assuming classification")
@@ -831,6 +844,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C
val outShape = estimator.getOutputShapeOfLastLayer
baseTransform(X, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
}
+ def transform_probability(X_file: String): String =
+ if (estimator.isClassification) {
+ Caffe2DML.LOG.debug("Prediction of probability assuming classification")
+ baseTransformProbability(X_file, sc, "Prob")
+ } else {
+ Caffe2DML.LOG.debug("Prediction of probability assuming segmentation")
+ val outShape = estimator.getOutputShapeOfLastLayer
+ baseTransformProbability(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
+ }
def transform_probability(X: MatrixBlock): MatrixBlock =
if (estimator.isClassification) {
Caffe2DML.LOG.debug("Prediction of probability assuming classification")
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index 97abe9e..5d22c46 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -111,6 +111,11 @@ trait HasRegParam extends Params {
}
trait BaseSystemMLEstimatorOrModel {
+ def dmlRead(X:String, fileX:String):String = {
+ val format = if(fileX.endsWith(".csv")) ", format=\"csv\"" else ""
+ return X + " = read(\"" + fileX + "\"" + format + "); "
+ }
+ def dmlWrite(X:String):String = "write("+ X + ", \"output.mtx\", format=\"binary\"); "
var enableGPU: Boolean = false
var forceGPU: Boolean = false
var explain: Boolean = false
@@ -215,6 +220,16 @@ trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel {
}
trait BaseSystemMLClassifier extends BaseSystemMLEstimator {
+ def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = {
+ val isSingleNode = false
+ val ml = new MLContext(sc)
+ updateML(ml)
+ val readScript = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y")
+ val res = ml.execute(readScript)
+ val ret = getTrainingScript(isSingleNode)
+ val script = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y"))
+ ml.execute(script)
+ }
def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = {
val isSingleNode = true
val ml = new MLContext(sc)
@@ -242,7 +257,32 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator {
trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
+ def baseTransform(X_file: String, sc: SparkContext, probVar: String): String = baseTransform(X_file, sc, probVar, -1, 1, 1)
def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1)
+
+ def baseTransform(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = {
+ val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
+ val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W); " + dmlWrite("Prediction"))
+ .in("Prob", Prob)
+ .in("C", C)
+ .in("H", H)
+ .in("W", W)
+ val ml = new MLContext(sc)
+ updateML(ml)
+ ml.execute(script1)
+ return "output.mtx"
+ }
+
+ def baseTransformHelper(X_file: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): Matrix = {
+ val isSingleNode = true
+ val ml = new MLContext(sc)
+ updateML(ml)
+ val readScript = dml(dmlRead("X", X_file)).out("X")
+ val res = ml.execute(readScript)
+ val script = getPredictionScript(isSingleNode)
+ val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X")))
+ return modelPredict.getMatrix(probVar)
+ }
def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = {
val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
@@ -282,9 +322,18 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock =
baseTransformProbability(X, sc, probVar, -1, 1, 1)
+ def baseTransformProbability(X: String, sc: SparkContext, probVar: String): String =
+ baseTransformProbability(X, sc, probVar, -1, 1, 1)
+
def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock =
return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock
+ def baseTransformProbability(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = {
+ val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
+ (new MLContext(sc)).execute(dml(dmlWrite("Prob")).in("Prob", Prob))
+ "output.mtx"
+ }
+
def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean = true): DataFrame =
baseTransform(df, sc, probVar, outputProb, -1, 1, 1)
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
index d94655b..4731422 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
@@ -35,6 +35,17 @@ import org.apache.sysml.api.mlcontext.ScriptFactory._
trait BaseSystemMLRegressor extends BaseSystemMLEstimator {
+ def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = {
+ val isSingleNode = false
+ val ml = new MLContext(sc)
+ updateML(ml)
+ val readScript = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y")
+ val res = ml.execute(readScript)
+ val ret = getTrainingScript(isSingleNode)
+ val script = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y"))
+ ml.execute(script)
+ }
+
def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = {
val isSingleNode = true
val ml = new MLContext(sc)
@@ -61,6 +72,18 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator {
trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel {
+ def baseTransform(X_file: String, sc: SparkContext, predictionVar: String): String = {
+ val isSingleNode = false
+ val ml = new MLContext(sc)
+ updateML(ml)
+ val readScript = dml(dmlRead("X", X_file)).out("X")
+ val res = ml.execute(readScript)
+ val script = getPredictionScript(isSingleNode)
+ val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X")))
+ val writeScript = dml(dmlWrite("X")).in("X", modelPredict.getMatrix(predictionVar))
+ ml.execute(writeScript)
+ return "output.mtx"
+ }
def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar: String): MatrixBlock = {
val isSingleNode = true
val ml = new MLContext(sc)
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
index b6f4966..ffb5d72 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
@@ -76,6 +76,11 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve
.out("beta_out")
(script, "X", "y")
}
+
+ def fit(X_file: String, y_file: String): LinearRegressionModel = {
+ mloutput = baseFit(X_file, y_file, sc)
+ new LinearRegressionModel(this)
+ }
def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = {
mloutput = baseFit(X_mb, y_mb, sc)
@@ -102,6 +107,7 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio
}
def transform_probability(X: MatrixBlock): MatrixBlock = throw new DMLRuntimeException("Unsupported method")
+ def transform_probability(X_file: String): String = throw new DMLRuntimeException("Unsupported method")
def baseEstimator(): BaseSystemMLEstimator = estimator
@@ -115,7 +121,6 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio
def modelVariables(): List[String] = List[String]("beta_out")
def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means")
-
def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means")
-
+ def transform(X_file: String): String = baseTransform(X_file, sc, "means")
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
index 98b6dd4..23ebcce 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
@@ -59,6 +59,11 @@ class LogisticRegression(override val uid: String, val sc: SparkContext)
val that = new LogisticRegression(uid, sc)
copyValues(that, extra)
}
+
+ def fit(X_file: String, y_file: String): LogisticRegressionModel = {
+ mloutput = baseFit(X_file, y_file, sc)
+ new LogisticRegressionModel(this)
+ }
// Note: will update the y_mb as this will be called by Python mllearn
def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = {
@@ -116,7 +121,9 @@ class LogisticRegressionModel(override val uid: String)(estimator: LogisticRegre
def modelVariables(): List[String] = List[String]("B_out")
def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means")
+ def transform(X: String): String = baseTransform(X, sc, "means")
def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "means")
+ def transform_probability(X: String): String = baseTransformProbability(X, sc, "means")
def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means")
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
index 8ecd4f0..43d1c53 100644
--- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
@@ -44,6 +44,11 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat
}
def setLaplace(value: Double) = set(laplace, value)
+ def fit(X_file: String, y_file: String): NaiveBayesModel = {
+ mloutput = baseFit(X_file, y_file, sc)
+ new NaiveBayesModel(this)
+ }
+
// Note: will update the y_mb as this will be called by Python mllearn
def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = {
mloutput = baseFit(X_mb, y_mb, sc)
@@ -108,7 +113,9 @@ class NaiveBayesModel(override val uid: String)(estimator: NaiveBayes, val sc: S
def baseEstimator(): BaseSystemMLEstimator = estimator
def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs")
+ def transform(X: String): String = baseTransform(X, sc, "probs")
def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "probs")
+ def transform_probability(X: String): String = baseTransformProbability(X, sc, "probs")
def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs")
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/SVM.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
index c4b7ae4..98ff81a 100644
--- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
@@ -69,6 +69,11 @@ class SVM(override val uid: String, val sc: SparkContext, val isMultiClass: Bool
.out("w")
(script, "X", "Y")
}
+
+ def fit(X_file: String, y_file: String): SVMModel = {
+ mloutput = baseFit(X_file, y_file, sc)
+ new SVMModel(this, isMultiClass)
+ }
// Note: will update the y_mb as this will be called by Python mllearn
def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = {
@@ -121,5 +126,7 @@ class SVMModel(override val uid: String)(estimator: SVM, val sc: SparkContext, v
def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores")
def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "scores")
+ def transform(X: String): String = baseTransform(X, sc, "scores")
+ def transform_probability(X: String): String = baseTransformProbability(X, sc, "scores")
def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores")
}