You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/09/01 00:55:25 UTC

spark git commit: [SPARK-10341] [SQL] fix memory starving in unsafe SMJ

Repository: spark
Updated Branches:
  refs/heads/master 5b3245d6d -> 540bdee93


[SPARK-10341] [SQL] fix memory starving in unsafe SMJ

In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page.

Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children.

cc rxin JoshRosen

Author: Davies Liu <da...@databricks.com>

Closes #8511 from davies/smj_memory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bdee9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bdee9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bdee9

Branch: refs/heads/master
Commit: 540bdee93103a73736d282b95db6a8cda8f6a2b1
Parents: 5b3245d
Author: Davies Liu <da...@databricks.com>
Authored: Mon Aug 31 15:55:22 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Aug 31 15:55:22 2015 -0700

----------------------------------------------------------------------
 .../rdd/MapPartitionsWithPreparationRDD.scala   | 21 ++++++++++++++++++--
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  | 13 ++++++++++++
 .../MapPartitionsWithPreparationRDDSuite.scala  | 14 +++++++++----
 3 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index b475bd8..1f2213d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.rdd
 
+import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, Partitioner, TaskContext}
@@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
+  // In certain join operations, prepare can be called on the same partition multiple times.
+  // In this case, we need to ensure that each call to compute gets a separate prepare argument.
+  private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+
+  /**
+   * Prepare a partition for a single call to compute.
+   */
+  def prepare(): Unit = {
+    preparedArguments += preparePartition()
+  }
+
   /**
    * Prepare a partition before computing it from its parent.
    */
   override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
-    val preparedArgument = preparePartition()
+    val prepared =
+      if (preparedArguments.isEmpty) {
+        preparePartition()
+      } else {
+        preparedArguments.remove(0)
+      }
     val parentIterator = firstParent[T].iterator(partition, context)
-    executePartition(context, partition.index, preparedArgument, parentIterator)
+    executePartition(context, partition.index, prepared, parentIterator)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 81f40ad..b3c6439 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
     super.clearDependencies()
     rdds = null
   }
+
+  /**
+   * Call the prepare method of every parent that has one.
+   * This is needed for reserving execution memory in advance.
+   */
+  protected def tryPrepareParents(): Unit = {
+    rdds.collect {
+      case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
+    }
+  }
 }
 
 private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
@@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    tryPrepareParents()
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
     f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
   }
@@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    tryPrepareParents()
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
     f(rdd1.iterator(partitions(0), context),
       rdd2.iterator(partitions(1), context),
@@ -134,6 +146,7 @@ private[spark] class ZippedPartitionsRDD4
   extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
 
   override def compute(s: Partition, context: TaskContext): Iterator[V] = {
+    tryPrepareParents()
     val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
     f(rdd1.iterator(partitions(0), context),
       rdd2.iterator(partitions(1), context),

http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
index c16930e..e281e81 100644
--- a/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDDSuite.scala
@@ -46,11 +46,17 @@ class MapPartitionsWithPreparationRDDSuite extends SparkFunSuite with LocalSpark
     }
 
     // Verify that the numbers are pushed in the order expected
-    val result = {
-      new MapPartitionsWithPreparationRDD[Int, Int, Unit](
-        parent, preparePartition, executePartition).collect()
-    }
+    val rdd = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
+      parent, preparePartition, executePartition)
+    val result = rdd.collect()
     assert(result === Array(10, 20, 30))
+
+    TestObject.things.clear()
+    // Zip two of these RDDs, both should be prepared before the parent is executed
+    val rdd2 = new MapPartitionsWithPreparationRDD[Int, Int, Unit](
+      parent, preparePartition, executePartition)
+    val result2 = rdd.zipPartitions(rdd2)((a, b) => a).collect()
+    assert(result2 === Array(10, 10, 20, 30, 20, 30))
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org