You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/01/30 22:59:13 UTC

spark git commit: [SPARK-5486] Added validate method to BlockMatrix

Repository: spark
Updated Branches:
  refs/heads/master 0a95085f0 -> 6ee8338b3


[SPARK-5486] Added validate method to BlockMatrix

The `validate` method will allow users to debug their `BlockMatrix`, if operations like `add` or `multiply` return unexpected results. It checks the following properties in a `BlockMatrix`:
- Are the dimensions of the `BlockMatrix` consistent with what the user entered: (`nRows`, `nCols`)
- Are the dimensions of each `MatrixBlock` consistent with what the user entered: (`rowsPerBlock`, `colsPerBlock`)
- Are there blocks with duplicate indices

Author: Burak Yavuz <br...@gmail.com>

Closes #4279 from brkyvz/SPARK-5486 and squashes the following commits:

c152a73 [Burak Yavuz] addressed code review v2
598c583 [Burak Yavuz] merged master
b55ac5c [Burak Yavuz] addressed code review v1
25f083b [Burak Yavuz] simplify implementation
0aa519a [Burak Yavuz] [SPARK-5486] Added validate method to BlockMatrix


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ee8338b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ee8338b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ee8338b

Branch: refs/heads/master
Commit: 6ee8338b377de9dc0adb5b26d9ea9e8519eb58ab
Parents: 0a95085
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri Jan 30 13:59:10 2015 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Fri Jan 30 13:59:10 2015 -0800

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 47 +++++++++++++++++---
 .../linalg/distributed/BlockMatrixSuite.scala   | 42 +++++++++++++++++
 2 files changed, 84 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ee8338b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 693419f..a640597 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import breeze.linalg.{DenseMatrix => BDM}
 
-import org.apache.spark.{Logging, Partitioner}
-import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
+import org.apache.spark.{SparkException, Logging, Partitioner}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -158,11 +158,13 @@ class BlockMatrix(
   private[mllib] var partitioner: GridPartitioner =
     GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
 
+  private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache()
+
   /** Estimates the dimensions of the matrix. */
   private def estimateDim(): Unit = {
-    val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
-      (blockRowIndex.toLong * rowsPerBlock + mat.numRows,
-        blockColIndex.toLong * colsPerBlock + mat.numCols)
+    val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) =>
+      (blockRowIndex.toLong * rowsPerBlock + m,
+        blockColIndex.toLong * colsPerBlock + n)
     }.reduce { (x0, x1) =>
       (math.max(x0._1, x1._1), math.max(x0._2, x1._2))
     }
@@ -172,6 +174,41 @@ class BlockMatrix(
     assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
   }
 
+  def validate(): Unit = {
+    logDebug("Validating BlockMatrix...")
+    // check if the matrix is larger than the claimed dimensions
+    estimateDim()
+    logDebug("BlockMatrix dimensions are okay...")
+
+    // Check if there are multiple MatrixBlocks with the same index.
+    blockInfo.countByKey().foreach { case (key, cnt) =>
+      if (cnt > 1) {
+        throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " +
+          "remove blocks with duplicate indices.")
+      }
+    }
+    logDebug("MatrixBlock indices are okay...")
+    // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
+    // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
+    val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " +
+      s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " +
+      s"dimensions. You may use the repartition method to fix this issue."
+    blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) =>
+      if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) ||
+          (blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) {
+        throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
+          dimensionMsg)
+      }
+      if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) ||
+        (blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) {
+        throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
+          dimensionMsg)
+      }
+    }
+    logDebug("MatrixBlock dimensions are okay...")
+    logDebug("BlockMatrix is valid!")
+  }
+
   /** Caches the underlying RDD. */
   def cache(): this.type = {
     blocks.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee8338b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 03f3430..461f1f9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import breeze.linalg.{DenseMatrix => BDM}
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkException
 import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 
@@ -147,6 +148,47 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
     assert(gridBasedMat.toBreeze() === expected)
   }
 
+  test("validate") {
+    // No error
+    gridBasedMat.validate()
+    // Wrong MatrixBlock dimensions
+    val blocks: Seq[((Int, Int), Matrix)] = Seq(
+      ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+      ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+      ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+      ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+      ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+    val rdd = sc.parallelize(blocks, numPartitions)
+    val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart)
+    val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1)
+    intercept[SparkException] {
+      wrongRowPerParts.validate()
+    }
+    intercept[SparkException] {
+      wrongColPerParts.validate()
+    }
+    // Wrong BlockMatrix dimensions
+    val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
+    intercept[AssertionError] {
+      wrongRowSize.validate()
+    }
+    val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
+    intercept[AssertionError] {
+      wrongColSize.validate()
+    }
+    // Duplicate indices
+    val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq(
+      ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+      ((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+      ((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+      ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+      ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+    val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2)
+    intercept[SparkException] {
+      dupMatrix.validate()
+    }
+  }
+
   test("transpose") {
     val expected = BDM(
       (1.0, 0.0, 3.0, 0.0, 0.0),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org