You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/09 01:57:18 UTC

[spark] branch master updated: [SPARK-26881][MLLIB] Heuristic for tree aggregate depth

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dfa2328  [SPARK-26881][MLLIB] Heuristic for tree aggregate depth
dfa2328 is described below

commit dfa2328e2806edf6f079dcbf74f8ece764c1f130
Author: Rafael Renaudin <re...@gmail.com>
AuthorDate: Mon Apr 8 20:56:53 2019 -0500

    [SPARK-26881][MLLIB] Heuristic for tree aggregate depth
    
    Changes proposed:
    - Adding method to compute treeAggregate depth required to avoid exceeding driver max result size (first commit)
    - Using it in the computation  of grammian of RowMatrix (second commit)
    
    Tests:
    - Unit Test wise, one unit test checking the behavior of the depth computation method
    - Tested at scale on hadoop cluster by doing PCA on a large dataset (needed depth 3 to succeed)
    
    Debatable choice:
    
    I'm not sure if RDD API is the right place to put the depth computation method. The advantage of it is that it allows to access driver max result size, and rdd number of partitions, to set default arguments for the method. Semantically, such a method might belong to something like org.apache.spark.util.Utils though.
    
    Closes #23983 from gagafunctor/Heuristic_for_treeAggregate_depth.
    
    Authored-by: Rafael Renaudin <re...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../spark/mllib/linalg/distributed/RowMatrix.scala | 34 +++++++++++++++++++++-
 .../mllib/linalg/distributed/RowMatrixSuite.scala  | 20 +++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 56caeac..43f48be 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -27,6 +27,7 @@ import breeze.numerics.{sqrt => brzSqrt}
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
 import org.apache.spark.rdd.RDD
@@ -117,6 +118,7 @@ class RowMatrix @Since("1.0.0") (
     // Computes n*(n+1)/2, avoiding overflow in the multiplication.
     // This succeeds when n <= 65535, which is checked above
     val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
+    val gramianSizeInBytes = nt * 8L
 
     // Compute the upper triangular part of the gram matrix.
     val GU = rows.treeAggregate(null.asInstanceOf[BDV[Double]])(
@@ -136,7 +138,8 @@ class RowMatrix @Since("1.0.0") (
           U1
         } else {
           U1 += U2
-        }
+        },
+      depth = getTreeAggregateIdealDepth(gramianSizeInBytes)
     )
 
     RowMatrix.triuToFull(n, GU.data)
@@ -775,6 +778,35 @@ class RowMatrix @Since("1.0.0") (
         s"The number of rows $m is different from what specified or previously computed: ${nRows}.")
     }
   }
+
+  /**
+   * Computing desired tree aggregate depth necessary to avoid exceeding
+   * driver.MaxResultSize during aggregation.
+   * Based on the formulae: (numPartitions)^(1/depth) * objectSize <= DriverMaxResultSize
+   * @param aggregatedObjectSizeInBytes the size, in megabytes, of the object being tree aggregated
+   */
+  private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes: Long) = {
+    require(aggregatedObjectSizeInBytes > 0,
+      "Cannot compute aggregate depth heuristic based on a zero-size object to aggregate")
+
+    val maxDriverResultSizeInBytes = rows.conf.get[Long](MAX_RESULT_SIZE)
+
+    require(maxDriverResultSizeInBytes > aggregatedObjectSizeInBytes,
+      s"Cannot aggregate object of size $aggregatedObjectSizeInBytes Bytes, "
+        + s"as it's bigger than maxResultSize ($maxDriverResultSizeInBytes Bytes)")
+
+    val numerator = math.log(rows.getNumPartitions)
+    val denominator = math.log(maxDriverResultSizeInBytes) - math.log(aggregatedObjectSizeInBytes)
+    val desiredTreeDepth = math.ceil(numerator / denominator)
+
+    if (desiredTreeDepth > 4) {
+      logWarning(
+        s"Desired tree depth for treeAggregation is big ($desiredTreeDepth)."
+          + "Consider increasing driver max result size or reducing number of partitions")
+    }
+
+    math.min(math.max(1, desiredTreeDepth), 10).toInt
+  }
 }
 
 @Since("1.0.0")
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index a4ca4f0..a0c4c68 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -101,6 +101,26 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     }
   }
 
+  test("getTreeAggregateIdealDepth") {
+    val nbPartitions = 100
+    val vectors = sc.emptyRDD[Vector]
+      .repartition(nbPartitions)
+    val rowMat = new RowMatrix(vectors)
+
+    assert(rowMat.getTreeAggregateIdealDepth(100 * 1024 * 1024) === 2)
+    assert(rowMat.getTreeAggregateIdealDepth(110 * 1024 * 1024) === 3)
+    assert(rowMat.getTreeAggregateIdealDepth(700 * 1024 * 1024) === 10)
+
+    val zeroSizeException = intercept[Exception]{
+      rowMat.getTreeAggregateIdealDepth(0)
+    }
+    assert(zeroSizeException.getMessage.contains("zero-size object to aggregate"))
+    val objectBiggerThanResultSize = intercept[Exception]{
+      rowMat.getTreeAggregateIdealDepth(1100 * 1024 * 1024)
+    }
+    assert(objectBiggerThanResultSize.getMessage.contains("it's bigger than maxResultSize"))
+  }
+
   test("similar columns") {
     val colMags = Vectors.dense(math.sqrt(126), math.sqrt(66), math.sqrt(94))
     val expected = BDM(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org