You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2017/01/27 04:10:31 UTC

spark git commit: [SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication

Repository: spark
Updated Branches:
  refs/heads/master 9f523d319 -> 1191fe267


[SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication

## What changes were proposed in this pull request?

### The problem in current block matrix mulitiplication

As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur:
- when the middle dimension N is too large, it will cause reducer OOM.
- even if OOM do not occur, it will still cause parallism too low.
- when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.)

### Key point of my improvement

In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem.

In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above:

suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols:
```
A00 A01 A02 ... A0,99
A10 A11 A12 ... A1,99
```
and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols:
```
B00    B01
B10    B11
B20    B21
...
B99,0  B99,1
```
Suppose all blocks in the two matrices are dense for now.
Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps:

**step-1**
Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following:
```
A00 A01 A02 ... A0,99
B00 B10 B20 ... B99,0    shuffled into reducer-00

A00 A01 A02 ... A0,99
B01 B11 B21 ... B99,1    shuffled into reducer-01

A10 A11 A12 ... A1,99
B00 B10 B20 ... B99,0    shuffled into reducer-10

A10 A11 A12 ... A1,99
B01 B11 B21 ... B99,1    shuffled into reducer-11
```

and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**.

**step-2**
Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block.

The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small.
and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM.

This PR try to resolve the problem described above.
When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly.
In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N.
Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**:

the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
now the shuffle works as following:

**reducer-000 to reducer-009**
```
A0,0 A0,10 A0,20 ... A0,90
B0,0 B10,0 B20,0 ... B90,0    shuffled into reducer-000

A0,1 A0,11 A0,21 ... A0,91
B1,0 B11,0 B21,0 ... B91,0    shuffled into reducer-001

A0,2 A0,12 A0,22 ... A0,92
B2,0 B12,0 B22,0 ... B92,0    shuffled into reducer-002

...

A0,9 A0,19 A0,29 ... A0,99
B9,0 B19,0 B29,0 ... B99,0    shuffled into reducer-009
```

**reducer-010 to reducer-019**
```
A0,0 A0,10 A0,20 ... A0,90
B0,1 B10,1 B20,1 ... B90,1    shuffled into reducer-010

A0,1 A0,11 A0,21 ... A0,91
B1,1 B11,1 B21,1 ... B91,1    shuffled into reducer-011

A0,2 A0,12 A0,22 ... A0,92
B2,1 B12,1 B22,1 ... B92,1    shuffled into reducer-012

...

A0,9 A0,19 A0,29 ... A0,99
B9,1 B19,1 B29,1 ... B99,1    shuffled into reducer-019
```

**reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out.

### API for this optimized algorithm

I add a new API as following:
```
  def multiply(
      other: BlockMatrix,
      numMidDimSplits: Int // middle dimension split number, expained above
): BlockMatrix
```

### Shuffled data size analysis (compared under the same parallelism)

The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size,
but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here:

Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case:

**case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1**
ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks
parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4**
ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks
parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**.

## How was this patch tested?

Test suites added.
Running result:
![blockmatrix](https://cloud.githubusercontent.com/assets/19235986/21600989/5e162cc2-d1bf-11e6-868c-0ec29190b605.png)

Author: WeichenXu <We...@outlook.com>

Closes #15730 from WeichenXu123/optim_block_matrix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1191fe26
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1191fe26
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1191fe26

Branch: refs/heads/master
Commit: 1191fe267d2faad2a99a83f3375ce2d9d382cfa0
Parents: 9f523d3
Author: WeichenXu <We...@outlook.com>
Authored: Thu Jan 26 20:10:17 2017 -0800
Committer: Burak Yavuz <br...@gmail.com>
Committed: Thu Jan 26 20:10:17 2017 -0800

----------------------------------------------------------------------
 .../mllib/linalg/distributed/BlockMatrix.scala  | 48 +++++++++++++++++---
 .../linalg/distributed/BlockMatrixSuite.scala   | 22 ++++++---
 2 files changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1191fe26/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index ff81a2f..20d68a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -425,22 +425,27 @@ class BlockMatrix @Since("1.3.0") (
    */
   private[distributed] def simulateMultiply(
       other: BlockMatrix,
-      partitioner: GridPartitioner): (BlockDestinations, BlockDestinations) = {
-    val leftMatrix = blockInfo.keys.collect() // blockInfo should already be cached
-    val rightMatrix = other.blocks.keys.collect()
+      partitioner: GridPartitioner,
+      midDimSplitNum: Int): (BlockDestinations, BlockDestinations) = {
+    val leftMatrix = blockInfo.keys.collect()
+    val rightMatrix = other.blockInfo.keys.collect()
 
     val rightCounterpartsHelper = rightMatrix.groupBy(_._1).mapValues(_.map(_._2))
     val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
       val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex, Array.empty[Int])
       val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b)))
-      ((rowIndex, colIndex), partitions.toSet)
+      val midDimSplitIndex = colIndex % midDimSplitNum
+      ((rowIndex, colIndex),
+        partitions.toSet.map((pid: Int) => pid * midDimSplitNum + midDimSplitIndex))
     }.toMap
 
     val leftCounterpartsHelper = leftMatrix.groupBy(_._2).mapValues(_.map(_._1))
     val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) =>
       val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex, Array.empty[Int])
       val partitions = leftCounterparts.map(b => partitioner.getPartition((b, colIndex)))
-      ((rowIndex, colIndex), partitions.toSet)
+      val midDimSplitIndex = rowIndex % midDimSplitNum
+      ((rowIndex, colIndex),
+        partitions.toSet.map((pid: Int) => pid * midDimSplitNum + midDimSplitIndex))
     }.toMap
 
     (leftDestinations, rightDestinations)
@@ -459,14 +464,39 @@ class BlockMatrix @Since("1.3.0") (
    */
   @Since("1.3.0")
   def multiply(other: BlockMatrix): BlockMatrix = {
+    multiply(other, 1)
+  }
+
+  /**
+   * Left multiplies this [[BlockMatrix]] to `other`, another [[BlockMatrix]]. The `colsPerBlock`
+   * of this matrix must equal the `rowsPerBlock` of `other`. If `other` contains
+   * `SparseMatrix`, they will have to be converted to a `DenseMatrix`. The output
+   * [[BlockMatrix]] will only consist of blocks of `DenseMatrix`. This may cause
+   * some performance issues until support for multiplying two sparse matrices is added.
+   * Blocks with duplicate indices will be added with each other.
+   *
+   * @param other Matrix `B` in `A * B = C`
+   * @param numMidDimSplits Number of splits to cut on the middle dimension when doing
+   *                        multiplication. For example, when multiplying a Matrix `A` of
+   *                        size `m x n` with Matrix `B` of size `n x k`, this parameter
+   *                        configures the parallelism to use when grouping the matrices. The
+   *                        parallelism will increase from `m x k` to `m x k x numMidDimSplits`,
+   *                        which in some cases also reduces total shuffled data.
+   */
+  @Since("2.2.0")
+  def multiply(
+      other: BlockMatrix,
+      numMidDimSplits: Int): BlockMatrix = {
     require(numCols() == other.numRows(), "The number of columns of A and the number of rows " +
       s"of B must be equal. A.numCols: ${numCols()}, B.numRows: ${other.numRows()}. If you " +
       "think they should be equal, try setting the dimensions of A and B explicitly while " +
       "initializing them.")
+    require(numMidDimSplits > 0, "numMidDimSplits should be a positive integer.")
     if (colsPerBlock == other.rowsPerBlock) {
       val resultPartitioner = GridPartitioner(numRowBlocks, other.numColBlocks,
         math.max(blocks.partitions.length, other.blocks.partitions.length))
-      val (leftDestinations, rightDestinations) = simulateMultiply(other, resultPartitioner)
+      val (leftDestinations, rightDestinations)
+        = simulateMultiply(other, resultPartitioner, numMidDimSplits)
       // Each block of A must be multiplied with the corresponding blocks in the columns of B.
       val flatA = blocks.flatMap { case ((blockRowIndex, blockColIndex), block) =>
         val destinations = leftDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
@@ -477,7 +507,11 @@ class BlockMatrix @Since("1.3.0") (
         val destinations = rightDestinations.getOrElse((blockRowIndex, blockColIndex), Set.empty)
         destinations.map(j => (j, (blockRowIndex, blockColIndex, block)))
       }
-      val newBlocks = flatA.cogroup(flatB, resultPartitioner).flatMap { case (pId, (a, b)) =>
+      val intermediatePartitioner = new Partitioner {
+        override def numPartitions: Int = resultPartitioner.numPartitions * numMidDimSplits
+        override def getPartition(key: Any): Int = key.asInstanceOf[Int]
+      }
+      val newBlocks = flatA.cogroup(flatB, intermediatePartitioner).flatMap { case (pId, (a, b)) =>
         a.flatMap { case (leftRowIndex, leftColIndex, leftBlock) =>
           b.filter(_._1 == leftColIndex).map { case (rightRowIndex, rightColIndex, rightBlock) =>
             val C = rightBlock match {

http://git-wip-us.apache.org/repos/asf/spark/blob/1191fe26/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index 61266f3..f6a9969 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -267,6 +267,15 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(sparseBM.subtract(sparseBM).toBreeze() === sparseBM.subtract(denseBM).toBreeze())
   }
 
+  def testMultiply(A: BlockMatrix, B: BlockMatrix, expectedResult: Matrix,
+      numMidDimSplits: Int): Unit = {
+    val C = A.multiply(B, numMidDimSplits)
+    val localC = C.toLocalMatrix()
+    assert(C.numRows() === A.numRows())
+    assert(C.numCols() === B.numCols())
+    assert(localC ~== expectedResult absTol 1e-8)
+  }
+
   test("multiply") {
     // identity matrix
     val blocks: Seq[((Int, Int), Matrix)] = Seq(
@@ -302,12 +311,13 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     // Try it with increased number of partitions
     val largeA = new BlockMatrix(sc.parallelize(largerAblocks, 10), 6, 4)
     val largeB = new BlockMatrix(sc.parallelize(largerBblocks, 8), 4, 4)
-    val largeC = largeA.multiply(largeB)
-    val localC = largeC.toLocalMatrix()
+
     val result = largeA.toLocalMatrix().multiply(largeB.toLocalMatrix().asInstanceOf[DenseMatrix])
-    assert(largeC.numRows() === largeA.numRows())
-    assert(largeC.numCols() === largeB.numCols())
-    assert(localC ~== result absTol 1e-8)
+
+    testMultiply(largeA, largeB, result, 1)
+    testMultiply(largeA, largeB, result, 2)
+    testMultiply(largeA, largeB, result, 3)
+    testMultiply(largeA, largeB, result, 4)
   }
 
   test("simulate multiply") {
@@ -318,7 +328,7 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     val B = new BlockMatrix(rdd, colPerPart, rowPerPart)
     val resultPartitioner = GridPartitioner(gridBasedMat.numRowBlocks, B.numColBlocks,
       math.max(numPartitions, 2))
-    val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner)
+    val (destinationsA, destinationsB) = gridBasedMat.simulateMultiply(B, resultPartitioner, 1)
     assert(destinationsA((0, 0)) === Set(0))
     assert(destinationsA((0, 1)) === Set(2))
     assert(destinationsA((1, 0)) === Set(0))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org