You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/01/28 19:06:45 UTC

spark git commit: [SPARK-3974][MLlib] Distributed Block Matrix Abstractions

Repository: spark
Updated Branches:
  refs/heads/master 622ff09d0 -> eeb53bf90


[SPARK-3974][MLlib] Distributed Block Matrix Abstractions

This pull request includes the abstractions for the distributed BlockMatrix representation.
`BlockMatrix` will allow users to store very large matrices in small blocks of local matrices. Specific partitioners, such as `RowBasedPartitioner` and `ColumnBasedPartitioner`, are implemented in order to optimize addition and multiplication operations that will be added in a following PR.

This work is based on the ml-matrix repo developed at the AMPLab at UC Berkeley, CA.
https://github.com/amplab/ml-matrix

Additional thanks to rezazadeh, shivaram, and mengxr for guidance on the design.

Author: Burak Yavuz <br...@gmail.com>
Author: Xiangrui Meng <me...@databricks.com>
Author: Burak Yavuz <br...@dn51t42l.sunet>
Author: Burak Yavuz <br...@dn51t4rd.sunet>
Author: Burak Yavuz <br...@dn0a221430.sunet>

Closes #3200 from brkyvz/SPARK-3974 and squashes the following commits:

a8eace2 [Burak Yavuz] Merge pull request #2 from mengxr/brkyvz-SPARK-3974
feb32a7 [Xiangrui Meng] update tests
e1d3ee8 [Xiangrui Meng] minor updates
24ec7b8 [Xiangrui Meng] update grid partitioner
5eecd48 [Burak Yavuz] fixed gridPartitioner and added tests
140f20e [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-3974
1694c9e [Burak Yavuz] almost finished addressing comments
f9d664b [Burak Yavuz] updated API and modified partitioning scheme
eebbdf7 [Burak Yavuz] preliminary changes addressing code review
1a63b20 [Burak Yavuz] [SPARK-3974] Remove setPartition method. Isn't required
1e8bb2a [Burak Yavuz] [SPARK-3974] Change return type of cache and persist
239ab4b [Burak Yavuz] [SPARK-3974] Addressed @jkbradley's comments
ba414d2 [Burak Yavuz] [SPARK-3974] fixed frobenius norm
ab6cde0 [Burak Yavuz] [SPARK-3974] Modifications cleaning code up, making size calculation more robust
9ae85aa [Burak Yavuz] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable
d033861 [Burak Yavuz] [SPARK-3974] Removed SubMatrixInfo and added constructor without partitioner
49b9586 [Burak Yavuz] [SPARK-3974] Updated testing utils from master
645afbe [Burak Yavuz] [SPARK-3974] Pull latest master
b05aabb [Burak Yavuz] [SPARK-3974] Updated tests to reflect changes
19c17e8 [Burak Yavuz] [SPARK-3974] Changed blockIdRow and blockIdCol
589fbb6 [Burak Yavuz] [SPARK-3974] Code review feedback addressed
aa8f086 [Burak Yavuz] [SPARK-3974] Additional comments added
f378e16 [Burak Yavuz] [SPARK-3974] Block Matrix Abstractions ready
b693209 [Burak Yavuz] Ready for Pull request


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeb53bf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeb53bf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeb53bf9

Branch: refs/heads/master
Commit: eeb53bf90e93b298eff48387d2e9ad699b52d001
Parents: 622ff09
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jan 28 10:06:37 2015 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Wed Jan 28 10:06:37 2015 -0800

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 216 +++++++++++++++++++
 .../linalg/distributed/BlockMatrixSuite.scala   | 135 ++++++++++++
 2 files changed, 351 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eeb53bf9/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
new file mode 100644
index 0000000..0ab74ba
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import breeze.linalg.{DenseMatrix => BDM}
+
+import org.apache.spark.{Logging, Partitioner}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A grid partitioner, which uses a regular grid to partition coordinates.
+ *
+ * @param rows Number of rows.
+ * @param cols Number of columns.
+ * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
+ * @param colsPerPart Number of columns per partition, which may be less at the right edge.
+ */
+private[mllib] class GridPartitioner(
+    val rows: Int,
+    val cols: Int,
+    val rowsPerPart: Int,
+    val colsPerPart: Int) extends Partitioner {
+
+  require(rows > 0)
+  require(cols > 0)
+  require(rowsPerPart > 0)
+  require(colsPerPart > 0)
+
+  private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
+  private val colPartitions = math.ceil(cols / colsPerPart).toInt
+
+  override val numPartitions = rowPartitions * colPartitions
+
+  /**
+   * Returns the index of the partition the input coordinate belongs to.
+   *
+   * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
+   *            multiplication. k is ignored in computing partitions.
+   * @return The index of the partition, which the coordinate belongs to.
+   */
+  override def getPartition(key: Any): Int = {
+    key match {
+      case (i: Int, j: Int) =>
+        getPartitionId(i, j)
+      case (i: Int, j: Int, _: Int) =>
+        getPartitionId(i, j)
+      case _ =>
+        throw new IllegalArgumentException(s"Unrecognized key: $key.")
+    }
+  }
+
+  /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
+  private def getPartitionId(i: Int, j: Int): Int = {
+    require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
+    require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
+    i / rowsPerPart + j / colsPerPart * rowPartitions
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case r: GridPartitioner =>
+        (this.rows == r.rows) && (this.cols == r.cols) &&
+          (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
+      case _ =>
+        false
+    }
+  }
+}
+
+private[mllib] object GridPartitioner {
+
+  /** Creates a new [[GridPartitioner]] instance. */
+  def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
+    new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
+  }
+
+  /** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
+  def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
+    require(suggestedNumPartitions > 0)
+    val scale = 1.0 / math.sqrt(suggestedNumPartitions)
+    val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
+    val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
+    new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
+  }
+}
+
+/**
+ * Represents a distributed matrix in blocks of local matrices.
+ *
+ * @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
+ *               this distributed matrix.
+ * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
+ *                     rows are not required to have the given number of rows
+ * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
+ *                     columns are not required to have the given number of columns
+ * @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
+ *              the number of rows will be calculated when `numRows` is invoked.
+ * @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
+ *              zero, the number of columns will be calculated when `numCols` is invoked.
+ */
+class BlockMatrix(
+    val blocks: RDD[((Int, Int), Matrix)],
+    val rowsPerBlock: Int,
+    val colsPerBlock: Int,
+    private var nRows: Long,
+    private var nCols: Long) extends DistributedMatrix with Logging {
+
+  private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)
+
+  /**
+   * Alternate constructor for BlockMatrix without the input of the number of rows and columns.
+   *
+   * @param rdd The RDD of SubMatrices (local matrices) that form this matrix
+   * @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
+   *                     rows are not required to have the given number of rows
+   * @param colsPerBlock Number of columns that make up each block. The blocks forming the final
+   *                     columns are not required to have the given number of columns
+   */
+  def this(
+      rdd: RDD[((Int, Int), Matrix)],
+      rowsPerBlock: Int,
+      colsPerBlock: Int) = {
+    this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
+  }
+
+  override def numRows(): Long = {
+    if (nRows <= 0L) estimateDim()
+    nRows
+  }
+
+  override def numCols(): Long = {
+    if (nCols <= 0L) estimateDim()
+    nCols
+  }
+
+  val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
+  val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt
+
+  private[mllib] var partitioner: GridPartitioner =
+    GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
+
+  /** Estimates the dimensions of the matrix. */
+  private def estimateDim(): Unit = {
+    val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
+      (blockRowIndex.toLong * rowsPerBlock + mat.numRows,
+        blockColIndex.toLong * colsPerBlock + mat.numCols)
+    }.reduce { (x0, x1) =>
+      (math.max(x0._1, x1._1), math.max(x0._2, x1._2))
+    }
+    if (nRows <= 0L) nRows = rows
+    assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
+    if (nCols <= 0L) nCols = cols
+    assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
+  }
+
+  /** Caches the underlying RDD. */
+  def cache(): this.type = {
+    blocks.cache()
+    this
+  }
+
+  /** Persists the underlying RDD with the specified storage level. */
+  def persist(storageLevel: StorageLevel): this.type = {
+    blocks.persist(storageLevel)
+    this
+  }
+
+  /** Collect the distributed matrix on the driver as a `DenseMatrix`. */
+  def toLocalMatrix(): Matrix = {
+    require(numRows() < Int.MaxValue, "The number of rows of this matrix should be less than " +
+      s"Int.MaxValue. Currently numRows: ${numRows()}")
+    require(numCols() < Int.MaxValue, "The number of columns of this matrix should be less than " +
+      s"Int.MaxValue. Currently numCols: ${numCols()}")
+    require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
+      s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
+    val m = numRows().toInt
+    val n = numCols().toInt
+    val mem = m * n / 125000
+    if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")
+
+    val localBlocks = blocks.collect()
+    val values = new Array[Double](m * n)
+    localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
+      val rowOffset = blockRowIndex * rowsPerBlock
+      val colOffset = blockColIndex * colsPerBlock
+      submat.foreachActive { (i, j, v) =>
+        val indexOffset = (j + colOffset) * m + rowOffset + i
+        values(indexOffset) = v
+      }
+    }
+    new DenseMatrix(m, n, values)
+  }
+
+  /** Collects data and assembles a local dense breeze matrix (for test only). */
+  private[mllib] def toBreeze(): BDM[Double] = {
+    val localMat = toLocalMatrix()
+    new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/eeb53bf9/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
new file mode 100644
index 0000000..05efbc8
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg.distributed
+
+import scala.util.Random
+
+import breeze.linalg.{DenseMatrix => BDM}
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+
+class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
+
+  val m = 5
+  val n = 4
+  val rowPerPart = 2
+  val colPerPart = 2
+  val numPartitions = 3
+  var gridBasedMat: BlockMatrix = _
+
+  override def beforeAll() {
+    super.beforeAll()
+
+    val blocks: Seq[((Int, Int), Matrix)] = Seq(
+      ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
+      ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
+      ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
+      ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
+      ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
+
+    gridBasedMat = new BlockMatrix(sc.parallelize(blocks, numPartitions), rowPerPart, colPerPart)
+  }
+
+  test("size") {
+    assert(gridBasedMat.numRows() === m)
+    assert(gridBasedMat.numCols() === n)
+  }
+
+  test("grid partitioner") {
+    val random = new Random()
+    // This should generate a 4x4 grid of 1x2 blocks.
+    val part0 = GridPartitioner(4, 7, suggestedNumPartitions = 12)
+    val expected0 = Array(
+      Array(0, 0, 4, 4,  8,  8, 12),
+      Array(1, 1, 5, 5,  9,  9, 13),
+      Array(2, 2, 6, 6, 10, 10, 14),
+      Array(3, 3, 7, 7, 11, 11, 15))
+    for (i <- 0 until 4; j <- 0 until 7) {
+      assert(part0.getPartition((i, j)) === expected0(i)(j))
+      assert(part0.getPartition((i, j, random.nextInt())) === expected0(i)(j))
+    }
+
+    intercept[IllegalArgumentException] {
+      part0.getPartition((-1, 0))
+    }
+
+    intercept[IllegalArgumentException] {
+      part0.getPartition((4, 0))
+    }
+
+    intercept[IllegalArgumentException] {
+      part0.getPartition((0, -1))
+    }
+
+    intercept[IllegalArgumentException] {
+      part0.getPartition((0, 7))
+    }
+
+    val part1 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
+    val expected1 = Array(
+      Array(0, 2),
+      Array(1, 3))
+    for (i <- 0 until 2; j <- 0 until 2) {
+      assert(part1.getPartition((i, j)) === expected1(i)(j))
+      assert(part1.getPartition((i, j, random.nextInt())) === expected1(i)(j))
+    }
+
+    val part2 = GridPartitioner(2, 2, suggestedNumPartitions = 5)
+    assert(part0 !== part2)
+    assert(part1 === part2)
+
+    val part3 = new GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
+    val expected3 = Array(
+      Array(0, 0, 2),
+      Array(1, 1, 3))
+    for (i <- 0 until 2; j <- 0 until 3) {
+      assert(part3.getPartition((i, j)) === expected3(i)(j))
+      assert(part3.getPartition((i, j, random.nextInt())) === expected3(i)(j))
+    }
+
+    val part4 = GridPartitioner(2, 3, rowsPerPart = 1, colsPerPart = 2)
+    assert(part3 === part4)
+
+    intercept[IllegalArgumentException] {
+      new GridPartitioner(2, 2, rowsPerPart = 0, colsPerPart = 1)
+    }
+
+    intercept[IllegalArgumentException] {
+      GridPartitioner(2, 2, rowsPerPart = 1, colsPerPart = 0)
+    }
+
+    intercept[IllegalArgumentException] {
+      GridPartitioner(2, 2, suggestedNumPartitions = 0)
+    }
+  }
+
+  test("toBreeze and toLocalMatrix") {
+    val expected = BDM(
+      (1.0, 0.0, 0.0, 0.0),
+      (0.0, 2.0, 1.0, 0.0),
+      (3.0, 1.0, 1.0, 0.0),
+      (0.0, 1.0, 2.0, 1.0),
+      (0.0, 0.0, 1.0, 5.0))
+
+    val dense = Matrices.fromBreeze(expected).asInstanceOf[DenseMatrix]
+    assert(gridBasedMat.toLocalMatrix() === dense)
+    assert(gridBasedMat.toBreeze() === expected)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org