You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/24 19:51:45 UTC

[spark] branch branch-3.1 updated: [SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 37e05ed  [SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation
37e05ed is described below

commit 37e05ed48d0aa2f807395863581a9f85e273e4f5
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun Jan 24 11:50:56 2021 -0800

    [SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation
    
    ### What changes were proposed in this pull request?
    
    This patch uses the available offset range obtained during polling Kafka records to do offset validation check.
    
    ### Why are the changes needed?
    
    We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.
    
    It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    This should pass existing unit tests.
    
    This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.
    
    Closes #31275 from viirya/SPARK-34187.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit ab6c0e5d10e594318d6421ee6c099f90dbda3a02)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/kafka010/consumer/FetchedDataPool.scala    |  5 ++--
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  | 30 +++++++++++++++-------
 .../kafka010/consumer/FetchedDataPoolSuite.scala   | 12 ++++-----
 3 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
index 6174bfb..3e68317 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
-import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
+import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
@@ -174,7 +174,8 @@ private[consumer] object FetchedDataPool {
       val emptyData = FetchedData(
         ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
         UNKNOWN_OFFSET,
-        UNKNOWN_OFFSET)
+        UNKNOWN_OFFSET,
+        AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET))
 
       CachedFetchedData(emptyData)
     }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 54f6f91..5c92d11 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -71,7 +71,7 @@ private[kafka010] class InternalKafkaConsumer(
    *                          consumer polls nothing before timeout.
    */
   def fetch(offset: Long, pollTimeoutMs: Long):
-      (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+      (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = {
 
     // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
     seek(offset)
@@ -80,7 +80,8 @@ private[kafka010] class InternalKafkaConsumer(
     logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
     val offsetAfterPoll = consumer.position(topicPartition)
     logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
-    val fetchedData = (r, offsetAfterPoll)
+    val range = getAvailableOffsetRange()
+    val fetchedData = (r, offsetAfterPoll, range)
     if (r.isEmpty) {
       // We cannot fetch anything after `poll`. Two possible cases:
       // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
@@ -88,7 +89,6 @@ private[kafka010] class InternalKafkaConsumer(
       // - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
       // - Fetched something but all of them are not invisible. This is a valid case and let the
       //   caller handles this.
-      val range = getAvailableOffsetRange()
       if (offset < range.earliest || offset >= range.latest) {
         throw new OffsetOutOfRangeException(
           Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
@@ -142,18 +142,22 @@ private[kafka010] class InternalKafkaConsumer(
  *                                 should check if the pre-fetched data is still valid.
  * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
  *                           poll when `records` is drained.
+ * @param _availableOffsetRange the available offset range in Kafka when polling the records.
  */
 private[consumer] case class FetchedData(
     private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
     private var _nextOffsetInFetchedData: Long,
-    private var _offsetAfterPoll: Long) {
+    private var _offsetAfterPoll: Long,
+    private var _availableOffsetRange: AvailableOffsetRange) {
 
   def withNewPoll(
       records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
-      offsetAfterPoll: Long): FetchedData = {
+      offsetAfterPoll: Long,
+      availableOffsetRange: AvailableOffsetRange): FetchedData = {
     this._records = records
     this._nextOffsetInFetchedData = UNKNOWN_OFFSET
     this._offsetAfterPoll = offsetAfterPoll
+    this._availableOffsetRange = availableOffsetRange
     this
   }
 
@@ -180,6 +184,7 @@ private[consumer] case class FetchedData(
     _records = ju.Collections.emptyListIterator()
     _nextOffsetInFetchedData = UNKNOWN_OFFSET
     _offsetAfterPoll = UNKNOWN_OFFSET
+    _availableOffsetRange = AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET)
   }
 
   /**
@@ -192,6 +197,13 @@ private[consumer] case class FetchedData(
    * Returns the next offset to poll after draining the pre-fetched records.
    */
   def offsetAfterPoll: Long = _offsetAfterPoll
+
+  /**
+   * Returns the tuple of earliest and latest offsets that is the available offset range when
+   * polling the records.
+   */
+  def availableOffsetRange: (Long, Long) =
+    (_availableOffsetRange.earliest, _availableOffsetRange.latest)
 }
 
 /**
@@ -474,8 +486,8 @@ private[kafka010] class KafkaDataConsumer(
       // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
       // available offset. Hence we need to handle offset mismatch.
       if (record.offset > offset) {
-        val range = consumer.getAvailableOffsetRange()
-        if (range.earliest <= offset) {
+        val (earliestOffset, _) = fetchedData.availableOffsetRange
+        if (earliestOffset <= offset) {
           // `offset` is still valid but the corresponding message is invisible. We should skip it
           // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
           // `fetchRecord` can just return `record` directly.
@@ -524,8 +536,8 @@ private[kafka010] class KafkaDataConsumer(
       fetchedData: FetchedData,
       offset: Long,
       pollTimeoutMs: Long): Unit = {
-    val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
-    fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+    val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs)
+    fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range)
   }
 
   private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
index 23bab5c..09d50ef0 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
-import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.ManualClock
 
@@ -69,7 +69,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
 
     dataList.map { case (_, data) =>
-      data.withNewPoll(testRecords(0, 5).listIterator, 5)
+      data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
     }
 
     dataList.foreach { case (key, data) =>
@@ -91,7 +91,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
 
     val data = dataPool.acquire(cacheKey, 0)
-    data.withNewPoll(testRecords(0, 5).listIterator, 5)
+    data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
 
     (0 to 3).foreach { _ => data.next() }
 
@@ -130,14 +130,14 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assert(getCache(dataPool)(cacheKey)(1).inUse)
 
     // reading from task 1
-    dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
+    dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
 
     (0 to 3).foreach { _ => dataFromTask1.next() }
 
     dataPool.release(cacheKey, dataFromTask1)
 
     // reading from task 2
-    dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
+    dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30, AvailableOffsetRange(0, 29))
 
     (0 to 5).foreach { _ => dataFromTask2.next() }
 
@@ -189,7 +189,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
 
     dataList.map { case (_, data) =>
-      data.withNewPoll(testRecords(0, 5).listIterator, 5)
+      data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
     }
 
     val dataToEvict = dataList.take(3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org