You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/05 13:33:17 UTC

spark git commit: [SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by the order of input partitions

Repository: spark
Updated Branches:
  refs/heads/master 2ce37b50f -> 42cf48e20


[SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by the order of input partitions

## What changes were proposed in this pull request?

The algorithm in `DefaultPartitionCoalescer.setupGroups` is responsible for picking preferred locations for coalesced partitions. It analyzes the preferred locations of input partitions. It starts by trying to create one partition for each unique location in the input. However, if the the requested number of coalesced partitions is higher that the number of unique locations, it has to pick duplicate locations.

Previously, the duplicate locations would be picked by iterating over the input partitions in order, and copying their preferred locations to coalesced partitions. If the input partitions were clustered by location, this could result in severe skew.

With the fix, instead of iterating over the list of input partitions in order, we pick them at random. It's not perfectly balanced, but it's much better.

## How was this patch tested?

Unit test reproducing the behavior was added.

Author: Ala Luszczak <al...@databricks.com>

Closes #20664 from ala/SPARK-23496.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42cf48e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42cf48e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42cf48e2

Branch: refs/heads/master
Commit: 42cf48e20cd5e47e1b7557af9c71c4eea142f10f
Parents: 2ce37b5
Author: Ala Luszczak <al...@databricks.com>
Authored: Mon Mar 5 14:33:12 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Mar 5 14:33:12 2018 +0100

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoalescedRDD.scala     |  8 ++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 42 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 10451a3..94e7d0b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -266,17 +266,17 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
         numCreated += 1
       }
     }
-    tries = 0
     // if we don't have enough partition groups, create duplicates
     while (numCreated < targetLen) {
-      val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
-      tries += 1
+      // Copy the preferred location from a random input partition.
+      // This helps in avoiding skew when the input partitions are clustered by preferred location.
+      val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(
+        rnd.nextInt(partitionLocs.partsWithLocs.length))
       val pgroup = new PartitionGroup(Some(nxt_replica))
       groupArr += pgroup
       groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
       addPartToPGroup(nxt_part, pgroup)
       numCreated += 1
-      if (tries >= partitionLocs.partsWithLocs.length) tries = 0
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e994d72..191c612 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -1129,6 +1129,35 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
     }.collect()
   }
 
+  test("SPARK-23496: order of input partitions can result in severe skew in coalesce") {
+    val numInputPartitions = 100
+    val numCoalescedPartitions = 50
+    val locations = Array("locA", "locB")
+
+    val inputRDD = sc.makeRDD(Range(0, numInputPartitions).toArray[Int], numInputPartitions)
+    assert(inputRDD.getNumPartitions == numInputPartitions)
+
+    val locationPrefRDD = new LocationPrefRDD(inputRDD, { (p: Partition) =>
+      if (p.index < numCoalescedPartitions) {
+        Seq(locations(0))
+      } else {
+        Seq(locations(1))
+      }
+    })
+    val coalescedRDD = new CoalescedRDD(locationPrefRDD, numCoalescedPartitions)
+
+    val numPartsPerLocation = coalescedRDD
+      .getPartitions
+      .map(coalescedRDD.getPreferredLocations(_).head)
+      .groupBy(identity)
+      .mapValues(_.size)
+
+    // Make sure the coalesced partitions are distributed fairly evenly between the two locations.
+    // This should not become flaky since the DefaultPartitionsCoalescer uses a fixed seed.
+    assert(numPartsPerLocation(locations(0)) > 0.4 * numCoalescedPartitions)
+    assert(numPartsPerLocation(locations(1)) > 0.4 * numCoalescedPartitions)
+  }
+
   // NOTE
   // Below tests calling sc.stop() have to be the last tests in this suite. If there are tests
   // running after them and if they access sc those tests will fail as sc is already closed, because
@@ -1210,3 +1239,16 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria
     groups.toArray
   }
 }
+
+/** Alters the preferred locations of the parent RDD using provided function. */
+class LocationPrefRDD[T: ClassTag](
+    @transient var prev: RDD[T],
+    val locationPicker: Partition => Seq[String]) extends RDD[T](prev) {
+  override protected def getPartitions: Array[Partition] = prev.partitions
+
+  override def compute(partition: Partition, context: TaskContext): Iterator[T] =
+    null.asInstanceOf[Iterator[T]]
+
+  override def getPreferredLocations(partition: Partition): Seq[String] =
+    locationPicker(partition)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org