You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/07/31 00:57:17 UTC

spark git commit: [SPARK-8671] [ML] Added isotonic regression to the pipeline API.

Repository: spark
Updated Branches:
  refs/heads/master 0dbd6963d -> 7f7a319c4


[SPARK-8671] [ML] Added isotonic regression to the pipeline API.

Author: martinzapletal <za...@email.cz>

Closes #7517 from zapletal-martin/SPARK-8671-isotonic-regression-api and squashes the following commits:

8c435c1 [martinzapletal] Review https://github.com/apache/spark/pull/7517 feedback update.
bebbb86 [martinzapletal] Merge remote-tracking branch 'upstream/master' into SPARK-8671-isotonic-regression-api
b68efc0 [martinzapletal] Added tests for param validation.
07c12bd [martinzapletal] Comments and refactoring.
834fcf7 [martinzapletal] Merge remote-tracking branch 'upstream/master' into SPARK-8671-isotonic-regression-api
b611fee [martinzapletal] SPARK-8671. Added first version of isotonic regression to pipeline API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7a319c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7a319c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7a319c

Branch: refs/heads/master
Commit: 7f7a319c4ce07f07a6bd68100cf0a4f1da66269e
Parents: 0dbd696
Author: martinzapletal <za...@email.cz>
Authored: Thu Jul 30 15:57:14 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Jul 30 15:57:14 2015 -0700

----------------------------------------------------------------------
 .../ml/regression/IsotonicRegression.scala      | 144 ++++++++++++++++++
 .../ml/regression/IsotonicRegressionSuite.scala | 148 +++++++++++++++++++
 2 files changed, 292 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7f7a319c/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
new file mode 100644
index 0000000..4ece8cf
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam}
+import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
+import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
+import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DoubleType, DataType}
+import org.apache.spark.sql.{Row, DataFrame}
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Params for isotonic regression.
+ */
+private[regression] trait IsotonicRegressionParams extends PredictorParams {
+
+  /**
+   * Param for weight column name.
+   * TODO: Move weightCol to sharedParams.
+   *
+   * @group param
+   */
+  final val weightCol: Param[String] =
+    new Param[String](this, "weightCol", "weight column name")
+
+  /** @group getParam */
+  final def getWeightCol: String = $(weightCol)
+
+  /**
+   * Param for isotonic parameter.
+   * Isotonic (increasing) or antitonic (decreasing) sequence.
+   * @group param
+   */
+  final val isotonic: BooleanParam =
+    new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic (decreasing) sequence")
+
+  /** @group getParam */
+  final def getIsotonicParam: Boolean = $(isotonic)
+}
+
+/**
+ * :: Experimental ::
+ * Isotonic regression.
+ *
+ * Currently implemented using parallelized pool adjacent violators algorithm.
+ * Only univariate (single feature) algorithm supported.
+ *
+ * Uses [[org.apache.spark.mllib.regression.IsotonicRegression]].
+ */
+@Experimental
+class IsotonicRegression(override val uid: String)
+  extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel]
+  with IsotonicRegressionParams {
+
+  def this() = this(Identifiable.randomUID("isoReg"))
+
+  /**
+   * Set the isotonic parameter.
+   * Default is true.
+   * @group setParam
+   */
+  def setIsotonicParam(value: Boolean): this.type = set(isotonic, value)
+  setDefault(isotonic -> true)
+
+  /**
+   * Set weight column param.
+   * Default is weight.
+   * @group setParam
+   */
+  def setWeightParam(value: String): this.type = set(weightCol, value)
+  setDefault(weightCol -> "weight")
+
+  override private[ml] def featuresDataType: DataType = DoubleType
+
+  override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+
+  private[this] def extractWeightedLabeledPoints(
+      dataset: DataFrame): RDD[(Double, Double, Double)] = {
+
+    dataset.select($(labelCol), $(featuresCol), $(weightCol))
+      .map { case Row(label: Double, features: Double, weights: Double) =>
+        (label, features, weights)
+      }
+  }
+
+  override protected def train(dataset: DataFrame): IsotonicRegressionModel = {
+    SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType)
+    // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
+    val instances = extractWeightedLabeledPoints(dataset)
+    val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
+    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+
+    val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
+    val parentModel = isotonicRegression.run(instances)
+
+    new IsotonicRegressionModel(uid, parentModel)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Model fitted by IsotonicRegression.
+ * Predicts using a piecewise linear function.
+ *
+ * For detailed rules see [[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]].
+ *
+ * @param parentModel A [[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
+ *                    model trained by [[org.apache.spark.mllib.regression.IsotonicRegression]].
+ */
+class IsotonicRegressionModel private[ml] (
+    override val uid: String,
+    private[ml] val parentModel: MLlibIsotonicRegressionModel)
+  extends RegressionModel[Double, IsotonicRegressionModel]
+  with IsotonicRegressionParams {
+
+  override def featuresDataType: DataType = DoubleType
+
+  override protected def predict(features: Double): Double = {
+    parentModel.predict(features)
+  }
+
+  override def copy(extra: ParamMap): IsotonicRegressionModel = {
+    copyValues(new IsotonicRegressionModel(uid, parentModel), extra)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f7a319c/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
new file mode 100644
index 0000000..66e4b17
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.regression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row}
+
+class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
+  private val schema = StructType(
+    Array(
+      StructField("label", DoubleType),
+      StructField("features", DoubleType),
+      StructField("weight", DoubleType)))
+
+  private val predictionSchema = StructType(Array(StructField("features", DoubleType)))
+
+  private def generateIsotonicInput(labels: Seq[Double]): DataFrame = {
+    val data = Seq.tabulate(labels.size)(i => Row(labels(i), i.toDouble, 1d))
+    val parallelData = sc.parallelize(data)
+
+    sqlContext.createDataFrame(parallelData, schema)
+  }
+
+  private def generatePredictionInput(features: Seq[Double]): DataFrame = {
+    val data = Seq.tabulate(features.size)(i => Row(features(i)))
+
+    val parallelData = sc.parallelize(data)
+    sqlContext.createDataFrame(parallelData, predictionSchema)
+  }
+
+  test("isotonic regression predictions") {
+    val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18))
+    val trainer = new IsotonicRegression().setIsotonicParam(true)
+
+    val model = trainer.fit(dataset)
+
+    val predictions = model
+      .transform(dataset)
+      .select("prediction").map {
+        case Row(pred) => pred
+      }.collect()
+
+    assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
+
+    assert(model.parentModel.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
+    assert(model.parentModel.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
+    assert(model.parentModel.isotonic)
+  }
+
+  test("antitonic regression predictions") {
+    val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1))
+    val trainer = new IsotonicRegression().setIsotonicParam(false)
+
+    val model = trainer.fit(dataset)
+    val features = generatePredictionInput(Seq(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0))
+
+    val predictions = model
+      .transform(features)
+      .select("prediction").map {
+        case Row(pred) => pred
+      }.collect()
+
+    assert(predictions === Array(7, 7, 6, 5.5, 5, 4, 1))
+  }
+
+  test("params validation") {
+    val dataset = generateIsotonicInput(Seq(1, 2, 3))
+    val ir = new IsotonicRegression
+    ParamsSuite.checkParams(ir)
+    val model = ir.fit(dataset)
+    ParamsSuite.checkParams(model)
+  }
+
+  test("default params") {
+    val dataset = generateIsotonicInput(Seq(1, 2, 3))
+    val ir = new IsotonicRegression()
+    assert(ir.getLabelCol === "label")
+    assert(ir.getFeaturesCol === "features")
+    assert(ir.getWeightCol === "weight")
+    assert(ir.getPredictionCol === "prediction")
+    assert(ir.getIsotonicParam === true)
+
+    val model = ir.fit(dataset)
+    model.transform(dataset)
+      .select("label", "features", "prediction", "weight")
+      .collect()
+
+    assert(model.getLabelCol === "label")
+    assert(model.getFeaturesCol === "features")
+    assert(model.getWeightCol === "weight")
+    assert(model.getPredictionCol === "prediction")
+    assert(model.getIsotonicParam === true)
+    assert(model.hasParent)
+  }
+
+  test("set parameters") {
+    val isotonicRegression = new IsotonicRegression()
+      .setIsotonicParam(false)
+      .setWeightParam("w")
+      .setFeaturesCol("f")
+      .setLabelCol("l")
+      .setPredictionCol("p")
+
+    assert(isotonicRegression.getIsotonicParam === false)
+    assert(isotonicRegression.getWeightCol === "w")
+    assert(isotonicRegression.getFeaturesCol === "f")
+    assert(isotonicRegression.getLabelCol === "l")
+    assert(isotonicRegression.getPredictionCol === "p")
+  }
+
+  test("missing column") {
+    val dataset = generateIsotonicInput(Seq(1, 2, 3))
+
+    intercept[IllegalArgumentException] {
+      new IsotonicRegression().setWeightParam("w").fit(dataset)
+    }
+
+    intercept[IllegalArgumentException] {
+      new IsotonicRegression().setFeaturesCol("f").fit(dataset)
+    }
+
+    intercept[IllegalArgumentException] {
+      new IsotonicRegression().setLabelCol("l").fit(dataset)
+    }
+
+    intercept[IllegalArgumentException] {
+      new IsotonicRegression().fit(dataset).setFeaturesCol("f").transform(dataset)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org