You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2018/02/27 21:26:21 UTC

systemml git commit: [SYSTEMML-445] Allow users to pass the file paths to the binary blocked, csv and ijv datasets to mllearn classes.

Repository: systemml
Updated Branches:
  refs/heads/master 54a11eed3 -> 8ffa3d158


[SYSTEMML-445] Allow users to pass the file paths to the binary blocked, csv and ijv datasets to mllearn classes.

- This allows the advance users who already have data materialized in
  binary blocked formats to avoid conversion overhead.
- Also, this facility is useful for benchmarking the performance of
  Keras2DML and Caffe2DML.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8ffa3d15
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8ffa3d15
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8ffa3d15

Branch: refs/heads/master
Commit: 8ffa3d158fe97e0871bcd4b77fa27504b8b85502
Parents: 54a11ee
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Tue Feb 27 13:21:32 2018 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Tue Feb 27 13:21:32 2018 -0800

----------------------------------------------------------------------
 src/main/python/systemml/mllearn/estimators.py  | 28 +++++++++--
 .../org/apache/sysml/api/dl/Caffe2DML.scala     | 22 +++++++++
 .../sysml/api/ml/BaseSystemMLClassifier.scala   | 49 ++++++++++++++++++++
 .../sysml/api/ml/BaseSystemMLRegressor.scala    | 23 +++++++++
 .../apache/sysml/api/ml/LinearRegression.scala  |  9 +++-
 .../sysml/api/ml/LogisticRegression.scala       |  7 +++
 .../org/apache/sysml/api/ml/NaiveBayes.scala    |  7 +++
 .../scala/org/apache/sysml/api/ml/SVM.scala     |  7 +++
 8 files changed, 145 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index 3f11d3f..de8aeb9 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -186,7 +186,19 @@ class BaseSystemMLEstimator(Estimator):
         self.X = None
         self.y = None
         return self
-        
+
+    def fit_file(self, X_file, y_file):
+        global default_jvm_stdout, default_jvm_stdout_parallel_flush
+        try:
+            if default_jvm_stdout:
+                with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
+                    self.model = self.estimator.fit(X_file, y_file)
+            else:
+                self.model = self.estimator.fit(X_file, y_file)
+        except Py4JError:
+            traceback.print_exc()
+        return self
+                
     # Returns a model after calling fit(df) on Estimator object on JVM
     def _fit(self, X):
         """
@@ -207,12 +219,14 @@ class BaseSystemMLEstimator(Estimator):
 
         Parameters
         ----------
-        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix
-        y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix
+        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, Spark DataFrame, file path
+        y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix, file path
         """
         if y is None:
             return self._fit(X)
-        elif y is not None and isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES):
+        elif isinstance(X, str) and isinstance(y, str):
+            return self.fit_file(X, y)
+        elif isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES):
             # Donot encode if y is a numpy matrix => useful for segmentation
             skipEncodingY = len(y.shape) == 2 and y.shape[0] != 1 and y.shape[1] != 1
             y = y if skipEncodingY else self.encode(y)
@@ -307,6 +321,8 @@ class BaseSystemMLEstimator(Estimator):
         except AttributeError:
             pass
         try:
+            if isinstance(X, str):
+                return self.model.transform_probability(X)
             jX = self._convertPythonXToJavaObject(X)
             if default_jvm_stdout:
                 with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):
@@ -323,7 +339,7 @@ class BaseSystemMLEstimator(Estimator):
 
         Parameters
         ----------
-        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame
+        X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix or PySpark DataFrame or file path
         """
         global default_jvm_stdout, default_jvm_stdout_parallel_flush
         try:
@@ -332,6 +348,8 @@ class BaseSystemMLEstimator(Estimator):
         except AttributeError:
             pass
         try:
+            if isinstance(X, str):
+                return self.model.transform(X)
             jX = self._convertPythonXToJavaObject(X)
             if default_jvm_stdout:
                 with jvm_stdout(parallel_flush=default_jvm_stdout_parallel_flush):

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
index da72403..26e554f 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
@@ -206,6 +206,10 @@ class Caffe2DML(val sc: SparkContext,
     val that = new Caffe2DML(sc, solverParam, solver, net, lrPolicy, numChannels, height, width)
     copyValues(that, extra)
   }
+  def fit(X_file: String, y_file: String): Caffe2DMLModel = {
+    mloutput = baseFit(X_file, y_file, sc)
+    new Caffe2DMLModel(this)
+  }
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): Caffe2DMLModel = {
     mloutput = baseFit(X_mb, y_mb, sc)
@@ -822,6 +826,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C
   def baseEstimator(): BaseSystemMLEstimator = estimator
 
   // Prediction
+  def transform(X_file: String): String =
+    if (estimator.isClassification) {
+      Caffe2DML.LOG.debug("Prediction assuming classification")
+      baseTransform(X_file, sc, "Prob")
+    } else {
+      Caffe2DML.LOG.debug("Prediction assuming segmentation")
+      val outShape = estimator.getOutputShapeOfLastLayer
+      baseTransform(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
+    }
   def transform(X: MatrixBlock): MatrixBlock =
     if (estimator.isClassification) {
       Caffe2DML.LOG.debug("Prediction assuming classification")
@@ -831,6 +844,15 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C
       val outShape = estimator.getOutputShapeOfLastLayer
       baseTransform(X, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
     }
+  def transform_probability(X_file: String): String =
+    if (estimator.isClassification) {
+      Caffe2DML.LOG.debug("Prediction of probability assuming classification")
+      baseTransformProbability(X_file, sc, "Prob")
+    } else {
+      Caffe2DML.LOG.debug("Prediction of probability assuming segmentation")
+      val outShape = estimator.getOutputShapeOfLastLayer
+      baseTransformProbability(X_file, sc, "Prob", outShape._1.toInt, outShape._2.toInt, outShape._3.toInt)
+    }
   def transform_probability(X: MatrixBlock): MatrixBlock =
     if (estimator.isClassification) {
       Caffe2DML.LOG.debug("Prediction of probability assuming classification")

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index 97abe9e..5d22c46 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -111,6 +111,11 @@ trait HasRegParam extends Params {
 }
 
 trait BaseSystemMLEstimatorOrModel {
+  def dmlRead(X:String, fileX:String):String = {
+    val format = if(fileX.endsWith(".csv")) ", format=\"csv\"" else ""
+	return X + " = read(\"" + fileX + "\"" + format + "); "
+  }
+  def dmlWrite(X:String):String = "write("+ X + ", \"output.mtx\", format=\"binary\"); "
   var enableGPU: Boolean                                                                          = false
   var forceGPU: Boolean                                                                           = false
   var explain: Boolean                                                                            = false
@@ -215,6 +220,16 @@ trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel {
 }
 
 trait BaseSystemMLClassifier extends BaseSystemMLEstimator {
+  def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = {
+	val isSingleNode = false
+	val ml           = new MLContext(sc)
+	updateML(ml)
+	val readScript      = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y")
+	val res = ml.execute(readScript)
+	val ret             = getTrainingScript(isSingleNode)
+	val script          = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y"))
+	ml.execute(script)
+  }
   def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = {
     val isSingleNode = true
     val ml           = new MLContext(sc)
@@ -242,7 +257,32 @@ trait BaseSystemMLClassifier extends BaseSystemMLEstimator {
 
 trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
 
+  def baseTransform(X_file: String, sc: SparkContext, probVar: String): String = baseTransform(X_file, sc, probVar, -1, 1, 1)
   def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1)
+ 
+  def baseTransform(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = {
+    val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
+    val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W); " + dmlWrite("Prediction"))
+      .in("Prob", Prob)
+      .in("C", C)
+      .in("H", H)
+      .in("W", W)
+    val ml           = new MLContext(sc)
+    updateML(ml)
+    ml.execute(script1)
+    return "output.mtx"
+  }
+
+  def baseTransformHelper(X_file: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): Matrix = {
+    val isSingleNode = true
+    val ml           = new MLContext(sc)
+    updateML(ml)
+    val readScript      = dml(dmlRead("X", X_file)).out("X")
+	val res = ml.execute(readScript)
+    val script = getPredictionScript(isSingleNode)
+    val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X")))
+    return modelPredict.getMatrix(probVar)
+  }
 
   def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = {
     val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
@@ -282,9 +322,18 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
   def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock =
     baseTransformProbability(X, sc, probVar, -1, 1, 1)
 
+  def baseTransformProbability(X: String, sc: SparkContext, probVar: String): String =
+    baseTransformProbability(X, sc, probVar, -1, 1, 1)
+    
   def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock =
     return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock
 
+  def baseTransformProbability(X: String, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): String = {
+	val Prob =  baseTransformHelper(X, sc, probVar, C, H, W)
+    (new MLContext(sc)).execute(dml(dmlWrite("Prob")).in("Prob", Prob))
+    "output.mtx"
+  }
+    	    		
   def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean = true): DataFrame =
     baseTransform(df, sc, probVar, outputProb, -1, 1, 1)
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
index d94655b..4731422 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
@@ -35,6 +35,17 @@ import org.apache.sysml.api.mlcontext.ScriptFactory._
 
 trait BaseSystemMLRegressor extends BaseSystemMLEstimator {
 
+  def baseFit(X_file: String, y_file: String, sc: SparkContext): MLResults = {
+    val isSingleNode = false
+    val ml           = new MLContext(sc)
+    updateML(ml)
+    val readScript      = dml(dmlRead("X", X_file) + dmlRead("y", y_file)).out("X", "y")
+	  val res = ml.execute(readScript)
+    val ret    = getTrainingScript(isSingleNode)
+    val script = ret._1.in(ret._2, res.getMatrix("X")).in(ret._3, res.getMatrix("y"))
+    ml.execute(script)
+  }
+  
   def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = {
     val isSingleNode = true
     val ml           = new MLContext(sc)
@@ -61,6 +72,18 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator {
 
 trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel {
 
+  def baseTransform(X_file: String, sc: SparkContext, predictionVar: String): String = {
+    val isSingleNode = false
+    val ml           = new MLContext(sc)
+    updateML(ml)
+    val readScript      = dml(dmlRead("X", X_file)).out("X")
+	  val res = ml.execute(readScript)
+    val script       = getPredictionScript(isSingleNode)
+    val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X")))
+    val writeScript  = dml(dmlWrite("X")).in("X", modelPredict.getMatrix(predictionVar))
+	  ml.execute(writeScript)
+    return "output.mtx"
+  }
   def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar: String): MatrixBlock = {
     val isSingleNode = true
     val ml           = new MLContext(sc)

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
index b6f4966..ffb5d72 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala
@@ -76,6 +76,11 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve
       .out("beta_out")
     (script, "X", "y")
   }
+  
+  def fit(X_file: String, y_file: String): LinearRegressionModel = {
+    mloutput = baseFit(X_file, y_file, sc)
+    new LinearRegressionModel(this)
+  }
 
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = {
     mloutput = baseFit(X_mb, y_mb, sc)
@@ -102,6 +107,7 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio
   }
 
   def transform_probability(X: MatrixBlock): MatrixBlock = throw new DMLRuntimeException("Unsupported method")
+  def transform_probability(X_file: String): String      = throw new DMLRuntimeException("Unsupported method")
 
   def baseEstimator(): BaseSystemMLEstimator = estimator
 
@@ -115,7 +121,6 @@ class LinearRegressionModel(override val uid: String)(estimator: LinearRegressio
   def modelVariables(): List[String] = List[String]("beta_out")
 
   def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means")
-
   def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means")
-
+  def transform(X_file: String): String     = baseTransform(X_file, sc, "means")
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
index 98b6dd4..23ebcce 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
@@ -59,6 +59,11 @@ class LogisticRegression(override val uid: String, val sc: SparkContext)
     val that = new LogisticRegression(uid, sc)
     copyValues(that, extra)
   }
+  
+  def fit(X_file: String, y_file: String): LogisticRegressionModel = {
+    mloutput = baseFit(X_file, y_file, sc)
+    new LogisticRegressionModel(this)
+  }
 
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = {
@@ -116,7 +121,9 @@ class LogisticRegressionModel(override val uid: String)(estimator: LogisticRegre
   def modelVariables(): List[String]         = List[String]("B_out")
 
   def transform(X: MatrixBlock): MatrixBlock               = baseTransform(X, sc, "means")
+  def transform(X: String): String                         = baseTransform(X, sc, "means")
   def transform_probability(X: MatrixBlock): MatrixBlock   = baseTransformProbability(X, sc, "means")
+  def transform_probability(X: String): String             = baseTransformProbability(X, sc, "means")
   def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means")
 }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
index 8ecd4f0..43d1c53 100644
--- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala
@@ -44,6 +44,11 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat
   }
   def setLaplace(value: Double) = set(laplace, value)
 
+  def fit(X_file: String, y_file: String): NaiveBayesModel = {
+    mloutput = baseFit(X_file, y_file, sc)
+    new NaiveBayesModel(this)
+  }
+  
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = {
     mloutput = baseFit(X_mb, y_mb, sc)
@@ -108,7 +113,9 @@ class NaiveBayesModel(override val uid: String)(estimator: NaiveBayes, val sc: S
 
   def baseEstimator(): BaseSystemMLEstimator               = estimator
   def transform(X: MatrixBlock): MatrixBlock               = baseTransform(X, sc, "probs")
+  def transform(X: String): String                         = baseTransform(X, sc, "probs")
   def transform_probability(X: MatrixBlock): MatrixBlock   = baseTransformProbability(X, sc, "probs")
+  def transform_probability(X: String): String             = baseTransformProbability(X, sc, "probs")
   def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs")
 
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8ffa3d15/src/main/scala/org/apache/sysml/api/ml/SVM.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
index c4b7ae4..98ff81a 100644
--- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala
@@ -69,6 +69,11 @@ class SVM(override val uid: String, val sc: SparkContext, val isMultiClass: Bool
       .out("w")
     (script, "X", "Y")
   }
+  
+  def fit(X_file: String, y_file: String): SVMModel = {
+    mloutput = baseFit(X_file, y_file, sc)
+    new SVMModel(this, isMultiClass)
+  }
 
   // Note: will update the y_mb as this will be called by Python mllearn
   def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = {
@@ -121,5 +126,7 @@ class SVMModel(override val uid: String)(estimator: SVM, val sc: SparkContext, v
 
   def transform(X: MatrixBlock): MatrixBlock               = baseTransform(X, sc, "scores")
   def transform_probability(X: MatrixBlock): MatrixBlock   = baseTransformProbability(X, sc, "scores")
+  def transform(X: String): String                         = baseTransform(X, sc, "scores")
+  def transform_probability(X: String): String             = baseTransformProbability(X, sc, "scores")
   def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores")
 }