You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/08/29 07:39:17 UTC

[spark] branch master updated: [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d1be37  [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option
7d1be37 is described below

commit 7d1be3710dddd446c23606c3871e28d211ad9776
Author: Andrew Olson <ao...@cerner.com>
AuthorDate: Sun Aug 29 16:38:29 2021 +0900

    [SPARK-36576][SS] Improve range split calculation for Kafka Source minPartitions option
    
    ### What changes were proposed in this pull request?
    
    Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic be modified to exclude small (i.e. un-split) partitions from the overall proportional distribution math, in order to more reasonably divide the large partitions when they are accompanied by many small partitions, and to provide optimal behavior for cases where a `minPartitions` value is deliberately computed based on the volume of data being read.
    
    ### Why are the changes needed?
    
    While the [documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) does contain a clear disclaimer,
    
    > Please note that this configuration is like a hint: the number of Spark tasks will be **approximately** `minPartitions`. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data.
    
    there are cases where the calculated Kafka partition range splits can differ greatly from expectations. For evenly distributed data and most `minPartitions `values this would not be a major or commonly encountered concern. However when the distribution of data across partitions is very heavily skewed, somewhat surprising range split calculations can result.
    
    For example, given the following input data:
    
    - 1 partition containing 10,000 messages
    - 1,000 partitions each containing 1 message
    
    Spark processing code loading from this collection of 1,001 partitions may decide that it would like each task to read no more than 1,000 messages. Consequently, it could specify a `minPartitions` value of 1,010 — expecting the single large partition to be split into 10 equal chunks, along with the 1,000 small partitions each having their own task. That is far from what actually occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large partition into 918 chunks of [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests and added new unit tests
    
    Closes #33827 from noslowerdna/SPARK-36576.
    
    Authored-by: Andrew Olson <ao...@cerner.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  | 31 ++++++++++--
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 58 ++++++++++++++++++++--
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      |  4 +-
 3 files changed, 84 insertions(+), 9 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 1e9a62e..4c0620a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -33,12 +33,13 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
    * Calculate the offset ranges that we are going to process this batch. If `minPartitions`
    * is not set or is set less than or equal the number of `topicPartitions` that we're going to
    * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If
-   * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up
+   * `minPartitions` is set higher than the number of our `topicPartitions`, then we will split up
    * the read tasks of the skewed partitions to multiple Spark tasks.
-   * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
+   * The number of Spark tasks will be *approximately* `minPartitions`. It can be less or more
    * depending on rounding errors or Kafka partitions that didn't receive any new data.
    *
-   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
+   * Empty (`KafkaOffsetRange.size == 0`) or invalid (`KafkaOffsetRange.size < 0`) ranges  will be
+   * dropped.
    */
   def getRanges(
       ranges: Seq[KafkaOffsetRange],
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
 
       // Splits offset ranges with relatively large amount of data to smaller ones.
       val totalSize = offsetRanges.map(_.size).sum
+
+      // First distinguish between any small (i.e. unsplit) ranges and large (i.e. split) ranges,
+      // in order to exclude the contents of unsplit ranges from the proportional math applied to
+      // split ranges
+      val unsplitRanges = offsetRanges.filter { range =>
+        getPartCount(range.size, totalSize, minPartitions.get) == 1
+      }
+
+      val unsplitRangeTotalSize = unsplitRanges.map(_.size).sum
+      val splitRangeTotalSize = totalSize - unsplitRangeTotalSize
+      val unsplitRangeTopicPartitions = unsplitRanges.map(_.topicPartition).toSet
+      val splitRangeMinPartitions = math.max(minPartitions.get - unsplitRanges.size, 1)
+
+      // Now we can apply the main calculation logic
       offsetRanges.flatMap { range =>
         val tp = range.topicPartition
         val size = range.size
         // number of partitions to divvy up this topic partition to
-        val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt
+        val parts = if (unsplitRangeTopicPartitions.contains(tp)) {
+          1
+        } else {
+          getPartCount(size, splitRangeTotalSize, splitRangeMinPartitions)
+        }
         var remaining = size
         var startOffset = range.fromOffset
         (0 until parts).map { part =>
@@ -76,6 +95,10 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
     }
   }
 
+  private def getPartCount(size: Long, totalSize: Long, minParts: Int): Int = {
+    math.max(math.round(size.toDouble / totalSize * minParts), 1).toInt
+  }
+
   private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = {
     def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
 
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 751b877..4ef019c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -106,7 +106,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
           KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set
   }
 
-  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc =>
+  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 4) { calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -134,7 +134,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
 
   testWithMinPartitions(
       "SPARK-30656: N very skewed TopicPartitions to M offset ranges",
-      3) { calc =>
+      4) { calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -170,7 +170,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
           KafkaOffsetRange(tp1, 7, 11, None)))
   }
 
-  testWithMinPartitions("empty ranges ignored", 3) { calc =>
+  testWithMinPartitions("empty ranges ignored", 4) { calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -201,6 +201,58 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
           KafkaOffsetRange(tp3, 0, 1, None)))
   }
 
+  testWithMinPartitions(
+    "SPARK-36576: 0 small unsplit ranges and 3 large split ranges", 9) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 10000),
+          KafkaOffsetRange(tp2, 0, 15000),
+          KafkaOffsetRange(tp3, 0, 20000))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 5000, None),
+          KafkaOffsetRange(tp1, 5000, 10000, None),
+          KafkaOffsetRange(tp2, 0, 5000, None),
+          KafkaOffsetRange(tp2, 5000, 10000, None),
+          KafkaOffsetRange(tp2, 10000, 15000, None),
+          KafkaOffsetRange(tp3, 0, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 10000, None),
+          KafkaOffsetRange(tp3, 10000, 15000, None),
+          KafkaOffsetRange(tp3, 15000, 20000, None)))
+  }
+
+  testWithMinPartitions("SPARK-36576: 1 small unsplit range and 2 large split ranges", 6) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 500),
+          KafkaOffsetRange(tp2, 0, 12000),
+          KafkaOffsetRange(tp3, 0, 15001))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 500, None),
+          KafkaOffsetRange(tp2, 0, 6000, None),
+          KafkaOffsetRange(tp2, 6000, 12000, None),
+          KafkaOffsetRange(tp3, 0, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 10000, None),
+          KafkaOffsetRange(tp3, 10000, 15001, None)))
+  }
+
+  testWithMinPartitions("SPARK-36576: 2 small unsplit ranges and 1 large split range", 6) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 1),
+          KafkaOffsetRange(tp2, 0, 1),
+          KafkaOffsetRange(tp3, 0, 10000))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 1, None),
+          KafkaOffsetRange(tp2, 0, 1, None),
+          KafkaOffsetRange(tp3, 0, 2500, None),
+          KafkaOffsetRange(tp3, 2500, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 7500, None),
+          KafkaOffsetRange(tp3, 7500, 10000, None)))
+  }
+
   private val tp1 = new TopicPartition("t1", 1)
   private val tp2 = new TopicPartition("t2", 1)
   private val tp3 = new TopicPartition("t3", 1)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index d1e49b0..332db54 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -143,7 +143,7 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk
     testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
-    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val reader = createKafkaReader(topic, minPartitions = Some(4))
 
     val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> EARLIEST))
     val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
@@ -163,7 +163,7 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk
     testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
-    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val reader = createKafkaReader(topic, minPartitions = Some(4))
 
     val fromPartitionOffsets = Map(tp1 -> 0L, tp2 -> 0L)
     val untilPartitionOffsets = Map(tp1 -> 100L, tp2 -> 3L)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org