You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/10/03 18:22:41 UTC

[spark] branch branch-3.0 updated: [SPARK-33043][ML] Handle spark.driver.maxResultSize=0 in RowMatrix heuristic computation

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c9b6271  [SPARK-33043][ML] Handle spark.driver.maxResultSize=0 in RowMatrix heuristic computation
c9b6271 is described below

commit c9b62711fdec24160c4bdeff8fc09eedb0b75ee0
Author: Sean Owen <sr...@gmail.com>
AuthorDate: Sat Oct 3 13:12:55 2020 -0500

    [SPARK-33043][ML] Handle spark.driver.maxResultSize=0 in RowMatrix heuristic computation
    
    ### What changes were proposed in this pull request?
    
    RowMatrix contains a computation based on spark.driver.maxResultSize. However, when this value is set to 0, the computation fails (log of 0). The fix is simply to correctly handle this setting, which means unlimited result size, by using a tree depth of 1 in the RowMatrix method.
    
    ### Why are the changes needed?
    
    Simple bug fix to make several Spark ML functions which use RowMatrix run correctly in this case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Not other than the bug fix of course.
    
    ### How was this patch tested?
    
    Existing RowMatrix tests plus a new test.
    
    Closes #29925 from srowen/SPARK-33043.
    
    Authored-by: Sean Owen <sr...@gmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
    (cherry picked from commit f86171aea43479f54ac2bbbca8f128baa3fc4a8c)
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../apache/spark/mllib/linalg/distributed/RowMatrix.scala |  6 +++++-
 .../spark/mllib/linalg/distributed/RowMatrixSuite.scala   | 15 +++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 20e26ce..07b9d91 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -786,11 +786,15 @@ class RowMatrix @Since("1.0.0") (
    * Based on the formulae: (numPartitions)^(1/depth) * objectSize <= DriverMaxResultSize
    * @param aggregatedObjectSizeInBytes the size, in megabytes, of the object being tree aggregated
    */
-  private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes: Long) = {
+  private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes: Long): Int = {
     require(aggregatedObjectSizeInBytes > 0,
       "Cannot compute aggregate depth heuristic based on a zero-size object to aggregate")
 
     val maxDriverResultSizeInBytes = rows.conf.get[Long](MAX_RESULT_SIZE)
+    if (maxDriverResultSizeInBytes <= 0) {
+      // Unlimited result size, so 1 is OK
+      return 1
+    }
 
     require(maxDriverResultSizeInBytes > aggregatedObjectSizeInBytes,
       s"Cannot aggregate object of size $aggregatedObjectSizeInBytes Bytes, "
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 0a4b119..adc4eee 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -25,6 +25,7 @@ import breeze.linalg.{norm => brzNorm, svd => brzSvd, DenseMatrix => BDM, DenseV
 import breeze.numerics.abs
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
 import org.apache.spark.mllib.random.RandomRDDs
 import org.apache.spark.mllib.util.{LocalClusterSparkContext, MLlibTestSparkContext}
@@ -121,6 +122,20 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(objectBiggerThanResultSize.getMessage.contains("it's bigger than maxResultSize"))
   }
 
+  test("SPARK-33043: getTreeAggregateIdealDepth with unlimited driver size") {
+    val originalMaxResultSize = sc.conf.get[Long](MAX_RESULT_SIZE)
+    sc.conf.set(MAX_RESULT_SIZE, 0L)
+    try {
+      val nbPartitions = 100
+      val vectors = sc.emptyRDD[Vector]
+        .repartition(nbPartitions)
+      val rowMat = new RowMatrix(vectors)
+      assert(rowMat.getTreeAggregateIdealDepth(700 * 1024 * 1024) === 1)
+    } finally {
+      sc.conf.set(MAX_RESULT_SIZE, originalMaxResultSize)
+    }
+  }
+
   test("similar columns") {
     val colMags = Vectors.dense(math.sqrt(126), math.sqrt(66), math.sqrt(94))
     val expected = BDM(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org