You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/04/21 09:14:20 UTC

spark git commit: [SPARK-5990] [MLLIB] Model import/export for IsotonicRegression

Repository: spark
Updated Branches:
  refs/heads/master ab9128fb7 -> 1f2f723b0


[SPARK-5990] [MLLIB] Model import/export for IsotonicRegression

Model import/export for IsotonicRegression

Author: Yanbo Liang <yb...@gmail.com>

Closes #5270 from yanboliang/spark-5990 and squashes the following commits:

872028d [Yanbo Liang] fix code style
f80ec1b [Yanbo Liang] address comments
49600cc [Yanbo Liang] address comments
429ff7d [Yanbo Liang] store each interval as a record
2b2f5a1 [Yanbo Liang] Model import/export for IsotonicRegression


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f2f723b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f2f723b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f2f723b

Branch: refs/heads/master
Commit: 1f2f723b0daacbb9e70ec42c19a84470af1d7765
Parents: ab9128f
Author: Yanbo Liang <yb...@gmail.com>
Authored: Tue Apr 21 00:14:16 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Tue Apr 21 00:14:16 2015 -0700

----------------------------------------------------------------------
 .../mllib/regression/IsotonicRegression.scala   | 78 +++++++++++++++++++-
 .../regression/IsotonicRegressionSuite.scala    | 21 ++++++
 2 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f2f723b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index cb70852..1d76170 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -23,9 +23,16 @@ import java.util.Arrays.binarySearch
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
+import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
 
 /**
  * :: Experimental ::
@@ -42,7 +49,7 @@ import org.apache.spark.rdd.RDD
 class IsotonicRegressionModel (
     val boundaries: Array[Double],
     val predictions: Array[Double],
-    val isotonic: Boolean) extends Serializable {
+    val isotonic: Boolean) extends Serializable with Saveable {
 
   private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse
 
@@ -124,6 +131,75 @@ class IsotonicRegressionModel (
       predictions(foundIndex)
     }
   }
+
+  override def save(sc: SparkContext, path: String): Unit = {
+    IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
+  }
+
+  override protected def formatVersion: String = "1.0"
+}
+
+object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
+
+  import org.apache.spark.mllib.util.Loader._
+
+  private object SaveLoadV1_0 {
+
+    def thisFormatVersion: String = "1.0"
+
+    /** Hard-code class name string in case it changes in the future */
+    def thisClassName: String = "org.apache.spark.mllib.regression.IsotonicRegressionModel"
+
+    /** Model data for model import/export */
+    case class Data(boundary: Double, prediction: Double)
+
+    def save(
+        sc: SparkContext, 
+        path: String, 
+        boundaries: Array[Double], 
+        predictions: Array[Double], 
+        isotonic: Boolean): Unit = {
+      val sqlContext = new SQLContext(sc)
+
+      val metadata = compact(render(
+        ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ 
+          ("isotonic" -> isotonic)))
+      sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
+
+      sqlContext.createDataFrame(
+        boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
+      ).saveAsParquetFile(dataPath(path))
+    }
+
+    def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
+      val sqlContext = new SQLContext(sc)
+      val dataRDD = sqlContext.parquetFile(dataPath(path))
+
+      checkSchema[Data](dataRDD.schema)
+      val dataArray = dataRDD.select("boundary", "prediction").collect()
+      val (boundaries, predictions) = dataArray.map { x =>
+        (x.getDouble(0), x.getDouble(1))
+      }.toList.sortBy(_._1).unzip
+      (boundaries.toArray, predictions.toArray)
+    }
+  }
+
+  override def load(sc: SparkContext, path: String): IsotonicRegressionModel = {
+    implicit val formats = DefaultFormats
+    val (loadedClassName, version, metadata) = loadMetadata(sc, path)
+    val isotonic =  (metadata \ "isotonic").extract[Boolean]
+    val classNameV1_0 = SaveLoadV1_0.thisClassName
+    (loadedClassName, version) match {
+      case (className, "1.0") if className == classNameV1_0 =>
+        val (boundaries, predictions) = SaveLoadV1_0.load(sc, path)
+        new IsotonicRegressionModel(boundaries, predictions, isotonic)
+      case _ => throw new Exception(
+        s"IsotonicRegressionModel.load did not recognize model with (className, format version):" +
+        s"($loadedClassName, $version).  Supported:\n" +
+        s"  ($classNameV1_0, 1.0)"
+      )
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1f2f723b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index 7ef4524..8e12340 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.{Matchers, FunSuite}
 
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.util.Utils
 
 class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {
 
@@ -73,6 +74,26 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
     assert(model.isotonic)
   }
 
+  test("model save/load") {
+    val boundaries = Array(0.0, 1.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0)
+    val predictions = Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0)
+    val model = new IsotonicRegressionModel(boundaries, predictions, true)
+
+    val tempDir = Utils.createTempDir()
+    val path = tempDir.toURI.toString
+
+    // Save model, load it back, and compare.
+    try {
+      model.save(sc, path)
+      val sameModel = IsotonicRegressionModel.load(sc, path)
+      assert(model.boundaries === sameModel.boundaries)
+      assert(model.predictions === sameModel.predictions)
+      assert(model.isotonic === model.isotonic)
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
   test("isotonic regression with size 0") {
     val model = runIsotonicRegression(Seq(), true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org