You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/21 19:42:43 UTC

spark git commit: [SPARK-4759] Fix driver hanging from coalescing partitions

Repository: spark
Updated Branches:
  refs/heads/branch-1.2 fd6266f45 -> 0c13eed25


[SPARK-4759] Fix driver hanging from coalescing partitions

The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction.

This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly.

Author: Andrew Or <an...@databricks.com>

Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits:

e520d6b [Andrew Or] Oops
3ebf8bd [Andrew Or] A few comments
f370a4e [Andrew Or] Fix tests
2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location

(cherry picked from commit 4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4)
Signed-off-by: Andrew Or <an...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c13eed2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c13eed2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c13eed2

Branch: refs/heads/branch-1.2
Commit: 0c13eed25e2310f774ab4235811a1d73734b2bbb
Parents: fd6266f
Author: Andrew Or <an...@databricks.com>
Authored: Wed Dec 10 14:27:53 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jan 21 10:42:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoalescedRDD.scala     | 36 ++++++++++++--------
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  2 +-
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c13eed2/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 9fab1d7..b073eba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
  * @param preferredLocation the preferred location for this partition
  */
 private[spark] case class CoalescedRDDPartition(
-                                  index: Int,
-                                  @transient rdd: RDD[_],
-                                  parentsIndices: Array[Int],
-                                  @transient preferredLocation: String = ""
-                                  ) extends Partition {
+    index: Int,
+    @transient rdd: RDD[_],
+    parentsIndices: Array[Int],
+    @transient preferredLocation: Option[String] = None) extends Partition {
   var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
 
   @throws(classOf[IOException])
@@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
    * @return locality of this coalesced partition between 0 and 1
    */
   def localFraction: Double = {
-    val loc = parents.count(p =>
-      rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
-
+    val loc = parents.count { p =>
+      val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
+      preferredLocation.exists(parentPreferredLocations.contains)
+    }
     if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
   }
 }
@@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
  * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
  */
 private[spark] class CoalescedRDD[T: ClassTag](
-                                      @transient var prev: RDD[T],
-                                      maxPartitions: Int,
-                                      balanceSlack: Double = 0.10)
+    @transient var prev: RDD[T],
+    maxPartitions: Int,
+    balanceSlack: Double = 0.10)
   extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependencies
 
   override def getPartitions: Array[Partition] = {
@@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
    * @return the machine most preferred by split
    */
   override def getPreferredLocations(partition: Partition): Seq[String] = {
-    List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
   }
 }
 
@@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
  *
  */
 
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
 
   def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
   def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
@@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
   }
 }
 
-private[spark] case class PartitionGroup(prefLoc: String = "") {
+private case class PartitionGroup(prefLoc: Option[String] = None) {
   var arr = mutable.ArrayBuffer[Partition]()
-
   def size = arr.size
 }
+
+private object PartitionGroup {
+  def apply(prefLoc: String): PartitionGroup = {
+    require(prefLoc != "", "Preferred location must not be empty")
+    PartitionGroup(Some(prefLoc))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0c13eed2/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e079ca3..8de634a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -297,7 +297,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
   test("coalesced RDDs with locality") {
     val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
     val coal3 = data3.coalesce(3)
-    val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+    val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
     assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
 
     // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org