You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/01/23 21:20:56 UTC

spark git commit: [SPARK-17455][MLLIB] Improve PAVA implementation in IsotonicRegression

Repository: spark
Updated Branches:
  refs/heads/master 4a11d029d -> c8aea7445


[SPARK-17455][MLLIB] Improve PAVA implementation in IsotonicRegression

## What changes were proposed in this pull request?

New implementation of the Pool Adjacent Violators Algorithm (PAVA) in mllib.IsotonicRegression, which used under the hood by ml.regression.IsotonicRegression. The previous implementation could have factorial complexity in the worst case. This implementation, which closely follows those in scikit-learn and the R `iso` package, runs in quadratic time in the worst case.
## How was this patch tested?

Existing unit tests in both `mllib` and `ml` passed before and after this patch. Scaling properties were tested by running the `poolAdjacentViolators` method in [scala-benchmarking-template](https://github.com/sirthias/scala-benchmarking-template) with the input generated by

``` scala
val x = (1 to length).toArray.map(_.toDouble)
val y = x.reverse.zipWithIndex.map{ case (yi, i) => if (i % 2 == 1) yi - 1.5 else yi}
val w = Array.fill(length)(1d)

val input: Array[(Double, Double, Double)] = (y zip x zip w) map{ case ((y, x), w) => (y, x, w)}
```

Before this patch:

| Input Length | Time (us) |
| --: | --: |
| 100 | 1.35 |
| 200 | 3.14 |
| 400 | 116.10 |
| 800 | 2134225.90 |

After this patch:

| Input Length | Time (us) |
| --: | --: |
| 100 | 1.25 |
| 200 | 2.53 |
| 400 | 5.86 |
| 800 | 10.55 |

Benchmarking was also performed with randomly-generated y values, with similar results.

Author: z001qdp <Ni...@target.com>

Closes #15018 from neggert/SPARK-17455-isoreg-algo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8aea744
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8aea744
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8aea744

Branch: refs/heads/master
Commit: c8aea7445c3bc724f4f2d4cae37d59748dd0e678
Parents: 4a11d02
Author: z001qdp <Ni...@target.com>
Authored: Mon Jan 23 13:20:52 2017 -0800
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Mon Jan 23 13:20:52 2017 -0800

----------------------------------------------------------------------
 .../mllib/regression/IsotonicRegression.scala   | 151 +++++++++++--------
 .../regression/IsotonicRegressionSuite.scala    |  17 +--
 2 files changed, 97 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8aea744/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 36894d5..2d23650 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -236,9 +236,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
  * Only univariate (single feature) algorithm supported.
  *
  * Sequential PAV implementation based on:
- * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
- *   "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
- *   Available from <a href="http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf">here</a>
+ * Grotzinger, S. J., and C. Witzgall.
+ *   "Projections onto order simplexes." Applied mathematics and Optimization 12.1 (1984): 247-270.
  *
  * Sequential PAV parallelization based on:
  * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
@@ -312,90 +311,118 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
   }
 
   /**
-   * Performs a pool adjacent violators algorithm (PAV).
-   * Uses approach with single processing of data where violators
-   * in previously processed data created by pooling are fixed immediately.
-   * Uses optimization of discovering monotonicity violating sequences (blocks).
+   * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
+   * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
+   * and end indices of blocks.
    *
-   * @param input Input data of tuples (label, feature, weight).
+   * [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied
+   * mathematics and Optimization 12.1 (1984): 247-270.
+   *
+   * [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic
+   * regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439.
+   *
+   * [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex
+   * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
+   * 658-672.
+   *
+   * @param input Input data of tuples (label, feature, weight). Weights must
+                  be non-negative.
    * @return Result tuples (label, feature, weight) where labels were updated
    *         to form a monotone sequence as per isotonic regression definition.
    */
   private def poolAdjacentViolators(
       input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
 
-    if (input.isEmpty) {
-      return Array.empty
+    val cleanInput = input.filter{ case (y, x, weight) =>
+      require(
+        weight >= 0.0,
+        s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
+      )
+      weight > 0
     }
 
-    // Pools sub array within given bounds assigning weighted average value to all elements.
-    def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
-      val poolSubArray = input.slice(start, end + 1)
+    if (cleanInput.isEmpty) {
+      return Array.empty
+    }
 
-      val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
-      val weight = poolSubArray.map(_._3).sum
+    // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
+    // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
+    // Initially, each data point is its own block.
+    val blockBounds = Array.range(0, cleanInput.length)
 
-      var i = start
-      while (i <= end) {
-        input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
-        i = i + 1
-      }
+    // Keep track of the sum of weights and sum of weight * y for each block. weights(start)
+    // gives the values for the block. Entries that are not at the start of a block
+    // are meaningless.
+    val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
+      (weight, weight * y)
     }
 
-    var i = 0
-    val len = input.length
-    while (i < len) {
-      var j = i
+    // a few convenience functions to make the code more readable
 
-      // Find monotonicity violating sequence, if any.
-      while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
-        j = j + 1
-      }
+    // blockStart and blockEnd have identical implementations. We create two different
+    // functions to make the code more expressive
+    def blockEnd(start: Int): Int = blockBounds(start)
+    def blockStart(end: Int): Int = blockBounds(end)
 
-      // If monotonicity was not violated, move to next data point.
-      if (i == j) {
-        i = i + 1
-      } else {
-        // Otherwise pool the violating sequence
-        // and check if pooling caused monotonicity violation in previously processed points.
-        while (i >= 0 && input(i)._1 > input(i + 1)._1) {
-          pool(input, i, j)
-          i = i - 1
-        }
+    // the next block starts at the index after the end of this block
+    def nextBlock(start: Int): Int = blockEnd(start) + 1
 
-        i = j
-      }
+    // the previous block ends at the index before the start of this block
+    // we then use blockStart to find the start
+    def prevBlock(start: Int): Int = blockStart(start - 1)
+
+    // Merge two adjacent blocks, updating blockBounds and weights to reflect the merge
+    // Return the start index of the merged block
+    def merge(block1: Int, block2: Int): Int = {
+      assert(
+        blockEnd(block1) + 1 == block2,
+        s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" +
+        s" and [${block2}, ${blockEnd(block2)}]. This is likely a bug in the isotonic regression" +
+        " implementation. Please file a bug report."
+      )
+      blockBounds(block1) = blockEnd(block2)
+      blockBounds(blockEnd(block2)) = block1
+      val w1 = weights(block1)
+      val w2 = weights(block2)
+      weights(block1) = (w1._1 + w2._1, w1._2 + w2._2)
+      block1
     }
 
-    // For points having the same prediction, we only keep two boundary points.
-    val compressed = ArrayBuffer.empty[(Double, Double, Double)]
+    // average value of a block
+    def average(start: Int): Double = weights(start)._2 / weights(start)._1
 
-    var (curLabel, curFeature, curWeight) = input.head
-    var rightBound = curFeature
-    def merge(): Unit = {
-      compressed += ((curLabel, curFeature, curWeight))
-      if (rightBound > curFeature) {
-        compressed += ((curLabel, rightBound, 0.0))
+    // Implement Algorithm PAV from [3].
+    // Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
+    // want to compress our output as much as possible. Both give correct results.
+    var i = 0
+    while (nextBlock(i) < cleanInput.length) {
+      if (average(i) >= average(nextBlock(i))) {
+        merge(i, nextBlock(i))
+        while((i > 0) && (average(prevBlock(i)) >= average(i))) {
+          i = merge(prevBlock(i), i)
+        }
+      } else {
+        i = nextBlock(i)
       }
     }
-    i = 1
-    while (i < input.length) {
-      val (label, feature, weight) = input(i)
-      if (label == curLabel) {
-        curWeight += weight
-        rightBound = feature
+
+    // construct the output by walking through the blocks in order
+    val output = ArrayBuffer.empty[(Double, Double, Double)]
+    i = 0
+    while (i < cleanInput.length) {
+      // If block size is > 1, a point at the start and end of the block,
+      // each receiving half the weight. Otherwise, a single point with
+      // all the weight.
+      if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
+        output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
+        output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
       } else {
-        merge()
-        curLabel = label
-        curFeature = feature
-        curWeight = weight
-        rightBound = curFeature
+        output += ((average(i), cleanInput(i)._2, weights(i)._1))
       }
-      i += 1
+      i = nextBlock(i)
     }
-    merge()
 
-    compressed.toArray
+    output.toArray
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c8aea744/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
index 94da626..02ea74b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.Matchers
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.util.Utils
@@ -163,17 +163,16 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
   }
 
   test("weighted isotonic regression with negative weights") {
-    val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
-
-    assert(model.boundaries === Array(0.0, 1.0, 4.0))
-    assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
+    val ex = intercept[SparkException] {
+      runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
+    }
+    assert(ex.getCause.isInstanceOf[IllegalArgumentException])
   }
 
   test("weighted isotonic regression with zero weights") {
-    val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
-
-    assert(model.boundaries === Array(0.0, 1.0, 4.0))
-    assert(model.predictions === Array(1, 2, 2))
+    val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1, 0), Seq(0, 0, 0, 1, 1, 0), true)
+    assert(model.boundaries === Array(3, 4))
+    assert(model.predictions === Array(1.5, 1.5))
   }
 
   test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org