You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/01/31 02:15:28 UTC

[spark] branch master updated: [SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source and streaming source v1

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f56ba37  [SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source and streaming source v1
f56ba37 is described below

commit f56ba37d8bf618f2bef23d808e0fc5704261b139
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Thu Jan 30 18:14:50 2020 -0800

    [SPARK-30656][SS] Support the "minPartitions" option in Kafka batch source and streaming source v1
    
    ### What changes were proposed in this pull request?
    
    - Add `minPartitions` support for Kafka Streaming V1 source.
    - Add `minPartitions` support for Kafka batch V1  and V2 source.
    - There is lots of refactoring (moving codes to KafkaOffsetReader) to reuse codes.
    
    ### Why are the changes needed?
    
    Right now, the "minPartitions" option only works in Kafka streaming source v2. It would be great that we can support it in batch and streaming source v1 (v1 is the fallback mode when a user hits a regression in v2) as well.
    
    ### Does this PR introduce any user-facing change?
    
    Yep. The `minPartitions` options is supported in Kafka batch and streaming queries for both data source V1 and V2.
    
    ### How was this patch tested?
    
    New unit tests are added to test "minPartitions".
    
    Closes #27388 from zsxwing/kafka-min-partitions.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 docs/structured-streaming-kafka-integration.md     |   2 +-
 .../org/apache/spark/sql/kafka010/KafkaBatch.scala |  32 +----
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  75 +---------
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  20 +--
 .../spark/sql/kafka010/KafkaOffsetReader.scala     | 156 +++++++++++++++++++++
 .../apache/spark/sql/kafka010/KafkaRelation.scala  |  32 +----
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  64 +--------
 .../apache/spark/sql/kafka010/KafkaSourceRDD.scala |  21 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  29 ++++
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 107 +++++++++-----
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      | 139 ++++++++++++++++++
 .../spark/sql/kafka010/KafkaRelationSuite.scala    |  22 +++
 12 files changed, 448 insertions(+), 251 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 0820b38..a1eeee5 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -469,7 +469,7 @@ The following configurations are optional:
   <td>minPartitions</td>
   <td>int</td>
   <td>none</td>
-  <td>streaming</td>
+  <td>streaming and batch</td>
   <td>Desired minimum number of partitions to read from Kafka.
   By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka.
   If you set this option to a value greater than your topicPartitions, Spark will divvy up large
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
index 3006770..9ad083f 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala
@@ -57,36 +57,12 @@ private[kafka010] class KafkaBatch(
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
     // Leverage the KafkaReader to obtain the relevant partition offsets
-    val (fromPartitionOffsets, untilPartitionOffsets) = {
-      try {
-        (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, isStartingOffsets = true),
-          kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, isStartingOffsets = false))
-      } finally {
-        kafkaOffsetReader.close()
-      }
+    val offsetRanges: Seq[KafkaOffsetRange] = try {
+      kafkaOffsetReader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    } finally {
+      kafkaOffsetReader.close()
     }
 
-    // Obtain topicPartitions in both from and until partition offset, ignoring
-    // topic partitions that were added and/or deleted between the two above calls.
-    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
-      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic())
-      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
-      val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
-      throw new IllegalStateException("different topic partitions " +
-        s"for starting offsets topics[${fromTopics}] and " +
-        s"ending offsets topics[${untilTopics}]")
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp,
-        // This should not happen since topicPartitions contains all partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset"))
-      val untilOffset = untilPartitionOffsets(tp)
-      KafkaOffsetRange(tp, fromOffset, untilOffset, None)
-    }.toArray
-
     val executorKafkaParams =
       KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId)
     offsetRanges.map { range =>
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 01f6ba4..844c963 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -66,8 +66,6 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
 
-  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
-
   private var endPartitionOffsets: KafkaSourceOffset = _
 
   /**
@@ -94,57 +92,11 @@ private[kafka010] class KafkaMicroBatchStream(
     val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
-    val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionInitialOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got deleted.
-      val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionInitialOffsets")
-    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
-    }
-
-    // Find deleted partitions, and report data loss if required
-    val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      val message =
-        if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-          s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-        } else {
-          s"$deletedPartitions are gone. Some data may have been missed."
-        }
-      reportDataLoss(message)
-    }
-
-    // Use the end partitions to calculate offset ranges to ignore partitions that have
-    // been deleted
-    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
-    val untilOffsets = endPartitionOffsets
-    untilOffsets.foreach { case (tp, untilOffset) =>
-      fromOffsets.get(tp).foreach { fromOffset =>
-        if (untilOffset < fromOffset) {
-          reportDataLoss(s"Partition $tp's offset was changed from " +
-            s"$fromOffset to $untilOffset, some data may have been missed")
-        }
-      }
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = fromOffsets,
-      untilOffsets = untilOffsets,
-      executorLocations = getSortedExecutorList())
+    val offsetRanges = kafkaOffsetReader.getOffsetRangesFromResolvedOffsets(
+      startPartitionOffsets,
+      endPartitionOffsets,
+      reportDataLoss
+    )
 
     // Generate factories based on the offset ranges
     offsetRanges.map { range =>
@@ -242,23 +194,6 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
-  private def getSortedExecutorList(): Array[String] = {
-
-    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
-      if (a.host == b.host) {
-        a.executorId > b.executorId
-      } else {
-        a.host > b.host
-      }
-    }
-
-    val bm = SparkEnv.get.blockManager
-    bm.master.getPeers(bm.blockManagerId).toArray
-      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
-      .sortWith(compare)
-      .map(_.toString)
-  }
-
   /**
    * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
    * Otherwise, just log a warning.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index ead4542..f7183f7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -41,14 +41,9 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
    * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
    */
   def getRanges(
-      fromOffsets: PartitionOffsetMap,
-      untilOffsets: PartitionOffsetMap,
+      ranges: Seq[KafkaOffsetRange],
       executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = {
-    val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet)
-
-    val offsetRanges = partitionsToRead.toSeq.map { tp =>
-      KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None)
-    }.filter(_.size > 0)
+    val offsetRanges = ranges.filter(_.size > 0)
 
     // If minPartitions not set or there are enough partitions to satisfy minPartitions
     if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) {
@@ -106,6 +101,13 @@ private[kafka010] case class KafkaOffsetRange(
     topicPartition: TopicPartition,
     fromOffset: Long,
     untilOffset: Long,
-    preferredLoc: Option[String]) {
-  lazy val size: Long = untilOffset - fromOffset
+    preferredLoc: Option[String] = None) {
+  def topic: String = topicPartition.topic
+  def partition: Int = topicPartition.partition
+  /**
+   * The estimated size of messages in the range. It may be different than the real number of
+   * messages due to log compaction or transaction metadata. It should not be used to provide
+   * answers directly.
+   */
+  def size: Long = untilOffset - fromOffset
 }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 0179f4d..216e74a 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -29,7 +29,9 @@ import scala.util.control.NonFatal
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, OffsetAndTimestamp}
 import org.apache.kafka.common.TopicPartition
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
 
@@ -91,9 +93,27 @@ private[kafka010] class KafkaOffsetReader(
   private[kafka010] val maxOffsetFetchAttempts =
     readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt
 
+  /**
+   * Number of partitions to read from Kafka. If this value is greater than the number of Kafka
+   * topicPartitions, we will split up  the read tasks of the skewed partitions to multiple Spark
+   * tasks. The number of Spark tasks will be *approximately* `numPartitions`. It can be less or
+   * more depending on rounding errors or Kafka partitions that didn't receive any new data.
+   */
+  private val minPartitions =
+    readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+
+  private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+
   private[kafka010] val offsetFetchAttemptIntervalMs =
     readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong
 
+  /**
+   * Whether we should divide Kafka TopicPartitions with a lot of data into smaller Spark tasks.
+   */
+  private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean = {
+    minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+  }
+
   private def nextGroupId(): String = {
     groupId = driverGroupIdPrefix + "-" + nextId
     nextId += 1
@@ -372,6 +392,142 @@ private[kafka010] class KafkaOffsetReader(
     }
   }
 
+  /**
+   * Return the offset ranges for a Kafka batch query. If `minPartitions` is set, this method may
+   * split partitions to respect it. Since offsets can be early and late binding which are evaluated
+   * on the executors, in order to divvy up the partitions we need to perform some substitutions. We
+   * don't want to send exact offsets to the executors, because data may age out before we can
+   * consume the data. This method makes some approximate splitting, and replaces the special offset
+   * values in the final output.
+   */
+  def getOffsetRangesFromUnresolvedOffsets(
+      startingOffsets: KafkaOffsetRangeLimit,
+      endingOffsets: KafkaOffsetRangeLimit): Seq[KafkaOffsetRange] = {
+    val fromPartitionOffsets = fetchPartitionOffsets(startingOffsets, isStartingOffsets = true)
+    val untilPartitionOffsets = fetchPartitionOffsets(endingOffsets, isStartingOffsets = false)
+
+    // Obtain topicPartitions in both from and until partition offset, ignoring
+    // topic partitions that were added and/or deleted between the two above calls.
+    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
+      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic())
+      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
+      val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
+      throw new IllegalStateException("different topic partitions " +
+        s"for starting offsets topics[${fromTopics}] and " +
+        s"ending offsets topics[${untilTopics}]")
+    }
+
+    // Calculate offset ranges
+    val offsetRangesBase = untilPartitionOffsets.keySet.map { tp =>
+      val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+        // This should not happen since topicPartitions contains all partitions not in
+        // fromPartitionOffsets
+        throw new IllegalStateException(s"$tp doesn't have a from offset")
+      }
+      val untilOffset = untilPartitionOffsets(tp)
+      KafkaOffsetRange(tp, fromOffset, untilOffset, None)
+    }.toSeq
+
+    if (shouldDivvyUpLargePartitions(offsetRangesBase.size)) {
+      val fromOffsetsMap =
+        offsetRangesBase.map(range => (range.topicPartition, range.fromOffset)).toMap
+      val untilOffsetsMap =
+        offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap
+
+      // No need to report data loss here
+      val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets
+      val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets
+      val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
+        KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None)
+      }
+      val divvied = rangeCalculator.getRanges(ranges).groupBy(_.topicPartition)
+      divvied.flatMap { case (tp, splitOffsetRanges) =>
+        if (splitOffsetRanges.length == 1) {
+          Seq(KafkaOffsetRange(tp, fromOffsetsMap(tp), untilOffsetsMap(tp), None))
+        } else {
+          // the list can't be empty
+          val first = splitOffsetRanges.head.copy(fromOffset = fromOffsetsMap(tp))
+          val end = splitOffsetRanges.last.copy(untilOffset = untilOffsetsMap(tp))
+          Seq(first) ++ splitOffsetRanges.drop(1).dropRight(1) :+ end
+        }
+      }.toArray.toSeq
+    } else {
+      offsetRangesBase
+    }
+  }
+
+  private def getSortedExecutorList(): Array[String] = {
+    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
+      if (a.host == b.host) {
+        a.executorId > b.executorId
+      } else {
+        a.host > b.host
+      }
+    }
+
+    val bm = SparkEnv.get.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compare)
+      .map(_.toString)
+  }
+
+  /**
+   * Return the offset ranges for a Kafka streaming batch. If `minPartitions` is set, this method
+   * may split partitions to respect it. If any data lost issue is detected, `reportDataLoss` will
+   * be called.
+   */
+  def getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets: PartitionOffsetMap,
+      untilPartitionOffsets: PartitionOffsetMap,
+      reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+    // Find the new partitions, and get their earliest offsets
+    val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
+    val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
+    if (newPartitionInitialOffsets.keySet != newPartitions) {
+      // We cannot get from offsets for some partitions. It means they got deleted.
+      val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
+      reportDataLoss(
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+    }
+    logInfo(s"Partitions added: $newPartitionInitialOffsets")
+    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+      reportDataLoss(
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+    }
+
+    val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
+    if (deletedPartitions.nonEmpty) {
+      val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      } else {
+        s"$deletedPartitions are gone. Some data may have been missed."
+      }
+      reportDataLoss(message)
+    }
+
+    // Use the until partitions to calculate offset ranges to ignore partitions that have
+    // been deleted
+    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
+      // Ignore partitions that we don't know the from offsets.
+      newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
+    }.toSeq
+    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+    val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
+    val untilOffsets = untilPartitionOffsets
+    val ranges = topicPartitions.map { tp =>
+      val fromOffset = fromOffsets(tp)
+      val untilOffset = untilOffsets(tp)
+      if (untilOffset < fromOffset) {
+        reportDataLoss(s"Partition $tp's offset was changed from " +
+          s"$fromOffset to $untilOffset, some data may have been missed")
+      }
+      KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
+    }
+    rangeCalculator.getRanges(ranges, getSortedExecutorList)
+  }
+
   private def partitionsAssignedToConsumer(
       body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
       fetchingEarliestOffset: Boolean = false)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 61479c9..413a0c4 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -66,36 +66,12 @@ private[kafka010] class KafkaRelation(
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
     // Leverage the KafkaReader to obtain the relevant partition offsets
-    val (fromPartitionOffsets, untilPartitionOffsets) = {
-      try {
-        (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets, isStartingOffsets = true),
-          kafkaOffsetReader.fetchPartitionOffsets(endingOffsets, isStartingOffsets = false))
-      } finally {
-        kafkaOffsetReader.close()
-      }
+    val offsetRanges: Seq[KafkaOffsetRange] = try {
+      kafkaOffsetReader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    } finally {
+      kafkaOffsetReader.close()
     }
 
-    // Obtain topicPartitions in both from and until partition offset, ignoring
-    // topic partitions that were added and/or deleted between the two above calls.
-    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
-      implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic())
-      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
-      val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
-      throw new IllegalStateException("different topic partitions " +
-        s"for starting offsets topics[${fromTopics}] and " +
-        s"ending offsets topics[${untilTopics}]")
-    }
-
-    // Calculate offset ranges
-    val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp,
-        // This should not happen since topicPartitions contains all partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset"))
-      val untilOffset = untilPartitionOffsets(tp)
-      KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
-    }.toArray
-
     logInfo("GetBatch generating RDD of offset range: " +
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index e1392b6..f0b3bf1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -210,66 +210,10 @@ private[kafka010] class KafkaSource(
         initialPartitionOffsets
     }
 
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
-    val newPartitionOffsets = kafkaReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got deleted.
-      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionOffsets")
-    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
-    }
-
-    val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
-    }
-
-    // Use the until partitions to calculate offset ranges to ignore partitions that have
-    // been deleted
-    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    val sortedExecutors = getSortedExecutorList(sc)
-    val numExecutors = sortedExecutors.length
-    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
-
-    // Calculate offset ranges
-    val offsetRanges = topicPartitions.map { tp =>
-      val fromOffset = fromPartitionOffsets.getOrElse(tp, newPartitionOffsets.getOrElse(tp, {
-        // This should not happen since newPartitionOffsets contains all partitions not in
-        // fromPartitionOffsets
-        throw new IllegalStateException(s"$tp doesn't have a from offset")
-      }))
-      val untilOffset = untilPartitionOffsets(tp)
-      val preferredLoc = if (numExecutors > 0) {
-        // This allows cached KafkaConsumers in the executors to be re-used to read the same
-        // partition in every batch.
-        Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
-      } else None
-      KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
-    }.filter { range =>
-      if (range.untilOffset < range.fromOffset) {
-        reportDataLoss(s"Partition ${range.topicPartition}'s offset was changed from " +
-          s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
-        false
-      } else {
-        true
-      }
-    }.toArray
+    val offsetRanges = kafkaReader.getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets,
+      untilPartitionOffsets,
+      reportDataLoss)
 
     // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
     val rdd = if (includeHeaders) {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index f1f3871..5475864 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.kafka010
 import java.{util => ju}
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -28,21 +27,9 @@ import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NextIterator
 
-/** Offset range that one partition of the KafkaSourceRDD has to read */
-private[kafka010] case class KafkaSourceRDDOffsetRange(
-    topicPartition: TopicPartition,
-    fromOffset: Long,
-    untilOffset: Long,
-    preferredLoc: Option[String]) {
-  def topic: String = topicPartition.topic
-  def partition: Int = topicPartition.partition
-  def size: Long = untilOffset - fromOffset
-}
-
-
 /** Partition of the KafkaSourceRDD */
 private[kafka010] case class KafkaSourceRDDPartition(
-  index: Int, offsetRange: KafkaSourceRDDOffsetRange) extends Partition
+  index: Int, offsetRange: KafkaOffsetRange) extends Partition
 
 
 /**
@@ -58,7 +45,7 @@ private[kafka010] case class KafkaSourceRDDPartition(
 private[kafka010] class KafkaSourceRDD(
     sc: SparkContext,
     executorKafkaParams: ju.Map[String, Object],
-    offsetRanges: Seq[KafkaSourceRDDOffsetRange],
+    offsetRanges: Seq[KafkaOffsetRange],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean)
   extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]](sc, Nil) {
@@ -130,7 +117,7 @@ private[kafka010] class KafkaSourceRDD(
     }
   }
 
-  private def resolveRange(consumer: KafkaDataConsumer, range: KafkaSourceRDDOffsetRange) = {
+  private def resolveRange(consumer: KafkaDataConsumer, range: KafkaOffsetRange) = {
     if (range.fromOffset < 0 || range.untilOffset < 0) {
       // Late bind the offset range
       val availableOffsetRange = consumer.getAvailableOffsetRange()
@@ -148,7 +135,7 @@ private[kafka010] class KafkaSourceRDD(
       } else {
         range.untilOffset
       }
-      KafkaSourceRDDOffsetRange(range.topicPartition,
+      KafkaOffsetRange(range.topicPartition,
         fromOffset, untilOffset, range.preferredLoc)
     } else {
       range
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 3ee59e5..468b21c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1063,6 +1063,35 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
   test("SPARK-27494: read kafka record containing null key/values.") {
     testNullableKeyValue(Trigger.ProcessingTime(100))
   }
+
+  test("SPARK-30656: minPartitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("20"), Some(2))
+
+    val ds = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      .option("minPartitions", "6")
+      .load()
+      .select($"value".as[String])
+    val q = ds.writeStream.foreachBatch { (batch: Dataset[String], _: Long) =>
+      val partitions = batch.rdd.collectPartitions()
+      assert(partitions.length >= 6)
+      assert(partitions.flatten.toSet === (0 to 20).map(_.toString).toSet): Unit
+    }.start()
+    try {
+      q.processAllAvailable()
+    } finally {
+      q.stop()
+    }
+  }
 }
 
 
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 2374a81..5d010cd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -34,31 +34,16 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
     }
   }
 
-
   test("with no minPartition: N TopicPartitions to N offset ranges") {
     val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 2)) ==
-      Seq(KafkaOffsetRange(tp1, 1, 2, None)))
-
-    assert(
-      calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) ==
+        Seq(KafkaOffsetRange(tp1, 1, 2))) ==
       Seq(KafkaOffsetRange(tp1, 1, 2, None)))
 
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2)) ==
-      Seq(KafkaOffsetRange(tp1, 1, 2, None)))
-
-    assert(
-      calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2),
+        Seq(KafkaOffsetRange(tp1, 1, 2)),
         executorLocations = Seq("location")) ==
       Seq(KafkaOffsetRange(tp1, 1, 2, Some("location"))))
   }
@@ -67,16 +52,19 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
     val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 1))) ===
       Seq(KafkaOffsetRange(tp1, 1, 2, None)))
   }
 
   testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
-        untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 2),
+          KafkaOffsetRange(tp3, 1, 2))) ===
       Seq(
         KafkaOffsetRange(tp1, 1, 2, None),
         KafkaOffsetRange(tp2, 1, 2, None),
@@ -86,18 +74,16 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 5)) ==
-      Seq(
-        KafkaOffsetRange(tp1, 1, 2, None),
-        KafkaOffsetRange(tp1, 2, 3, None),
-        KafkaOffsetRange(tp1, 3, 4, None),
-        KafkaOffsetRange(tp1, 4, 5, None)))
+        Seq(KafkaOffsetRange(tp1, 1, 5))) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, None),
+          KafkaOffsetRange(tp1, 2, 3, None),
+          KafkaOffsetRange(tp1, 3, 4, None),
+          KafkaOffsetRange(tp1, 4, 5, None)))
 
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 5),
+        Seq(KafkaOffsetRange(tp1, 1, 5)),
         executorLocations = Seq("location")) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 2, None),
@@ -109,8 +95,9 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1),
-        untilOffsets = Map(tp1 -> 5, tp2 -> 21)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21))) ===
         Seq(
           KafkaOffsetRange(tp1, 1, 5, None),
           KafkaOffsetRange(tp2, 1, 7, None),
@@ -118,11 +105,51 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
           KafkaOffsetRange(tp2, 14, 21, None)))
   }
 
+  testWithMinPartitions("SPARK-30656: ignore empty ranges and split the rest", 4) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 1),
+          KafkaOffsetRange(tp2, 1, 21))) ===
+        Seq(
+          KafkaOffsetRange(tp2, 1, 6, None),
+          KafkaOffsetRange(tp2, 6, 11, None),
+          KafkaOffsetRange(tp2, 11, 16, None),
+          KafkaOffsetRange(tp2, 16, 21, None)))
+  }
+
+  testWithMinPartitions(
+      "SPARK-30656: N very skewed TopicPartitions to M offset ranges",
+      3) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2),
+          KafkaOffsetRange(tp2, 1, 1001))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, None),
+          KafkaOffsetRange(tp2, 1, 334, None),
+          KafkaOffsetRange(tp2, 334, 667, None),
+          KafkaOffsetRange(tp2, 667, 1001, None)))
+  }
+
+  testWithMinPartitions(
+      "SPARK-30656: minPartitions less than the length of topic partitions",
+      1) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5, None),
+          KafkaOffsetRange(tp2, 1, 21, None)))
+  }
+
   testWithMinPartitions("range inexact multiple of minPartitions", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1),
-        untilOffsets = Map(tp1 -> 11)) ==
+        Seq(KafkaOffsetRange(tp1, 1, 11))) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 4, None),
           KafkaOffsetRange(tp1, 4, 7, None),
@@ -132,8 +159,10 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   testWithMinPartitions("empty ranges ignored", 3) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1),
-        untilOffsets = Map(tp1 -> 5, tp2 -> 21, tp3 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5),
+          KafkaOffsetRange(tp2, 1, 21),
+          KafkaOffsetRange(tp3, 1, 1))) ==
         Seq(
           KafkaOffsetRange(tp1, 1, 5, None),
           KafkaOffsetRange(tp2, 1, 7, None),
@@ -144,8 +173,10 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
   testWithMinPartitions("SPARK-28489: never drop offsets", 6) { calc =>
     assert(
       calc.getRanges(
-        fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0),
-        untilOffsets = Map(tp1 -> 10, tp2 -> 10, tp3 -> 1)) ==
+        Seq(
+          KafkaOffsetRange(tp1, 0, 10),
+          KafkaOffsetRange(tp2, 0, 10),
+          KafkaOffsetRange(tp3, 0, 1))) ==
         Seq(
           KafkaOffsetRange(tp1, 0, 3, None),
           KafkaOffsetRange(tp1, 3, 6, None),
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
new file mode 100644
index 0000000..ad22a56
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.kafka010.KafkaOffsetRangeLimit.{EARLIEST, LATEST}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with KafkaTest {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def createKafkaReader(topic: String, minPartitions: Option[Int]): KafkaOffsetReader = {
+    new KafkaOffsetReader(
+      SubscribeStrategy(Seq(topic)),
+      org.apache.spark.sql.kafka010.KafkaSourceProvider.kafkaParamsForDriver(
+        Map(
+        "bootstrap.servers" ->
+         testUtils.brokerAddress
+      )),
+      CaseInsensitiveMap(
+        minPartitions.map(m => Map("minPartitions" -> m.toString)).getOrElse(Map.empty)),
+      UUID.randomUUID().toString
+    )
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - using specific offsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (0 until 10).map(_.toString).toArray, Some(0))
+    val tp = new TopicPartition(topic, 0)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val startingOffsets = SpecificOffsetRangeLimit(Map(tp -> 1))
+    val endingOffsets = SpecificOffsetRangeLimit(Map(tp -> 4))
+    val offsetRanges = reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp, 1, 2, None),
+      KafkaOffsetRange(tp, 2, 3, None),
+      KafkaOffsetRange(tp, 3, 4, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - using special offsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(0))
+    val tp = new TopicPartition(topic, 0)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val startingOffsets = EarliestOffsetRangeLimit
+    val endingOffsets = LatestOffsetRangeLimit
+    val offsetRanges = reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp, EARLIEST, 1, None),
+      KafkaOffsetRange(tp, 1, 2, None),
+      KafkaOffsetRange(tp, 2, LATEST, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - multiple topic partitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (0 until 100).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+
+    val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> EARLIEST))
+    val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
+    val offsetRanges = reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets)
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp2, EARLIEST, 3, None),
+      KafkaOffsetRange(tp1, EARLIEST, 33, None),
+      KafkaOffsetRange(tp1, 33, 66, None),
+      KafkaOffsetRange(tp1, 66, LATEST, None)))
+  }
+
+  test("SPARK-30656: getOffsetRangesFromResolvedOffsets") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (0 until 100).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+    val reader = createKafkaReader(topic, minPartitions = Some(3))
+
+    val fromPartitionOffsets = Map(tp1 -> 0L, tp2 -> 0L)
+    val untilPartitionOffsets = Map(tp1 -> 100L, tp2 -> 3L)
+    val offsetRanges = reader.getOffsetRangesFromResolvedOffsets(
+      fromPartitionOffsets,
+      untilPartitionOffsets,
+      _ => {})
+    assert(offsetRanges === Seq(
+      KafkaOffsetRange(tp1, 0, 33, None),
+      KafkaOffsetRange(tp1, 33, 66, None),
+      KafkaOffsetRange(tp1, 66, 100, None),
+      KafkaOffsetRange(tp2, 0, 3, None)))
+  }
+}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 2c022c1..32d0561 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -597,6 +597,28 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
       checkAnswer(df, (1 to 15).map(_.toString).toDF)
     }
   }
+
+  test("SPARK-30656: minPartitions") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("20"), Some(2))
+
+    // Implicit offset values, should default to earliest and latest
+    val df = createDF(topic, Map("minPartitions" -> "6"))
+    val rdd = df.rdd
+    val partitions = rdd.collectPartitions()
+    assert(partitions.length >= 6)
+    assert(partitions.flatMap(_.map(_.getString(0))).toSet === (0 to 20).map(_.toString).toSet)
+
+    // Because of late binding, reused `rdd` and `df` should see the new data.
+    testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray)
+    assert(rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet
+      === (0 to 30).map(_.toString).toSet)
+    assert(df.rdd.collectPartitions().flatMap(_.map(_.getString(0))).toSet
+      === (0 to 30).map(_.toString).toSet)
+  }
 }
 
 class KafkaRelationSuiteV1 extends KafkaRelationSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org