You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/05/06 07:57:17 UTC

spark git commit: [SPARK-6267] [MLLIB] Python API for IsotonicRegression

Repository: spark
Updated Branches:
  refs/heads/master ba2b56614 -> 7b1457839


[SPARK-6267] [MLLIB] Python API for IsotonicRegression

https://issues.apache.org/jira/browse/SPARK-6267

Author: Yanbo Liang <yb...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #5890 from yanboliang/spark-6267 and squashes the following commits:

f20541d [Yanbo Liang] Merge pull request #3 from mengxr/SPARK-6267
7f202f9 [Xiangrui Meng] use Vector to have the best Python 2&3 compatibility
4bccfee [Yanbo Liang] fix doctest
ec09412 [Yanbo Liang] fix typos
8214bbb [Yanbo Liang] fix code style
5c8ebe5 [Yanbo Liang] Python API for IsotonicRegression


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b145783
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b145783
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b145783

Branch: refs/heads/master
Commit: 7b1457839bdac124a07fd6292f6263f0ded48880
Parents: ba2b566
Author: Yanbo Liang <yb...@gmail.com>
Authored: Tue May 5 22:57:13 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue May 5 22:57:13 2015 -0700

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 18 +++++
 .../mllib/regression/IsotonicRegression.scala   | 19 ++++-
 python/pyspark/mllib/regression.py              | 73 +++++++++++++++++++-
 3 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b145783/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index b086cec..426306d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -283,6 +283,24 @@ private[python] class PythonMLLibAPI extends Serializable {
   }
 
   /**
+   * Java stub for Python mllib IsotonicRegression.run()
+   */
+  def trainIsotonicRegressionModel(
+      data: JavaRDD[Vector],
+      isotonic: Boolean): JList[Object] = {
+    val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
+    val input = data.rdd.map { x =>
+      (x(0), x(1), x(2))
+    }.persist(StorageLevel.MEMORY_AND_DISK)
+    try {
+      val model = isotonicRegressionAlg.run(input)
+      List[AnyRef](model.boundaryVector, model.predictionVector).asJava
+    } finally {
+      data.rdd.unpersist(blocking = false)
+    }
+  }
+
+  /**
    * Java stub for Python mllib KMeans.run()
    */
   def trainKMeansModel(

http://git-wip-us.apache.org/repos/asf/spark/blob/7b145783/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 1d76170..be2a00c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -21,18 +21,20 @@ import java.io.Serializable
 import java.lang.{Double => JDouble}
 import java.util.Arrays.binarySearch
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.json4s._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.SQLContext
 
 /**
  * :: Experimental ::
@@ -57,6 +59,13 @@ class IsotonicRegressionModel (
   assertOrdered(boundaries)
   assertOrdered(predictions)(predictionOrd)
 
+  /** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
+  def this(boundaries: java.lang.Iterable[Double],
+      predictions: java.lang.Iterable[Double],
+      isotonic: java.lang.Boolean) = {
+    this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
+  }
+
   /** Asserts the input array is monotone with the given ordering. */
   private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
     var i = 1
@@ -132,6 +141,12 @@ class IsotonicRegressionModel (
     }
   }
 
+  /** A convenient method for boundaries called by the Python API. */
+  private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)
+
+  /** A convenient method for boundaries called by the Python API. */
+  private[mllib] def predictionVector: Vector = Vectors.dense(predictions)
+
   override def save(sc: SparkContext, path: String): Unit = {
     IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b145783/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 4bc6351..41bde2c 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -18,14 +18,16 @@
 import numpy as np
 from numpy import array
 
+from pyspark import RDD
 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
 from pyspark.mllib.util import Saveable, Loader
 
 __all__ = ['LabeledPoint', 'LinearModel',
            'LinearRegressionModel', 'LinearRegressionWithSGD',
            'RidgeRegressionModel', 'RidgeRegressionWithSGD',
-           'LassoModel', 'LassoWithSGD']
+           'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
+           'IsotonicRegression']
 
 
 class LabeledPoint(object):
@@ -396,6 +398,73 @@ class RidgeRegressionWithSGD(object):
         return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
 
 
+class IsotonicRegressionModel(Saveable, Loader):
+
+    """Regression model for isotonic regression.
+
+    >>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
+    >>> irm = IsotonicRegression.train(sc.parallelize(data))
+    >>> irm.predict(3)
+    2.0
+    >>> irm.predict(5)
+    16.5
+    >>> irm.predict(sc.parallelize([3, 5])).collect()
+    [2.0, 16.5]
+    >>> import os, tempfile
+    >>> path = tempfile.mkdtemp()
+    >>> irm.save(sc, path)
+    >>> sameModel = IsotonicRegressionModel.load(sc, path)
+    >>> sameModel.predict(3)
+    2.0
+    >>> sameModel.predict(5)
+    16.5
+    >>> try:
+    ...     os.removedirs(path)
+    ... except OSError:
+    ...     pass
+    """
+
+    def __init__(self, boundaries, predictions, isotonic):
+        self.boundaries = boundaries
+        self.predictions = predictions
+        self.isotonic = isotonic
+
+    def predict(self, x):
+        if isinstance(x, RDD):
+            return x.map(lambda v: self.predict(v))
+        return np.interp(x, self.boundaries, self.predictions)
+
+    def save(self, sc, path):
+        java_boundaries = _py2java(sc, self.boundaries.tolist())
+        java_predictions = _py2java(sc, self.predictions.tolist())
+        java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
+            java_boundaries, java_predictions, self.isotonic)
+        java_model.save(sc._jsc.sc(), path)
+
+    @classmethod
+    def load(cls, sc, path):
+        java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
+            sc._jsc.sc(), path)
+        py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
+        py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
+        return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)
+
+
+class IsotonicRegression(object):
+    """
+    Run IsotonicRegression algorithm to obtain isotonic regression model.
+
+    :param data:            RDD of (label, feature, weight) tuples.
+    :param isotonic:        Whether this is isotonic or antitonic.
+    """
+    @classmethod
+    def train(cls, data, isotonic=True):
+        """Train a isotonic regression model on the given data."""
+        boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
+                                                data.map(_convert_to_vector), bool(isotonic))
+        return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
+
+
 def _test():
     import doctest
     from pyspark import SparkContext


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org