You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/01/25 20:00:24 UTC

[spark] branch branch-3.0 updated: [SPARK-34187][SS][3.0] Use available offset range obtained during polling when checking offset validation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4693523  [SPARK-34187][SS][3.0] Use available offset range obtained during polling when checking offset validation
4693523 is described below

commit 469352358b9248679b615328536d424de20ab92e
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Jan 25 11:59:17 2021 -0800

    [SPARK-34187][SS][3.0] Use available offset range obtained during polling when checking offset validation
    
    ### What changes were proposed in this pull request?
    
    This patch uses the available offset range obtained during polling Kafka records to do offset validation check.
    
    ### Why are the changes needed?
    
    We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.
    
    It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    This should pass existing unit tests.
    
    This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.
    
    Closes #31328 from viirya/SPARK-34187-3.0.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/kafka010/consumer/FetchedDataPool.scala    |  5 ++--
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  | 29 +++++++++++++++-------
 .../kafka010/consumer/FetchedDataPoolSuite.scala   | 12 ++++-----
 3 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
index 6174bfb..3e68317 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
-import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
+import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
@@ -174,7 +174,8 @@ private[consumer] object FetchedDataPool {
       val emptyData = FetchedData(
         ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
         UNKNOWN_OFFSET,
-        UNKNOWN_OFFSET)
+        UNKNOWN_OFFSET,
+        AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET))
 
       CachedFetchedData(emptyData)
     }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index df51c4f..c3142b5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -69,7 +69,7 @@ private[kafka010] class InternalKafkaConsumer(
    *                          consumer polls nothing before timeout.
    */
   def fetch(offset: Long, pollTimeoutMs: Long):
-      (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+      (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = {
 
     // Seek to the offset because we may call seekToBeginning or seekToEnd before this.
     seek(offset)
@@ -78,7 +78,8 @@ private[kafka010] class InternalKafkaConsumer(
     logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
     val offsetAfterPoll = consumer.position(topicPartition)
     logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
-    val fetchedData = (r, offsetAfterPoll)
+    val range = getAvailableOffsetRange()
+    val fetchedData = (r, offsetAfterPoll, range)
     if (r.isEmpty) {
       // We cannot fetch anything after `poll`. Two possible cases:
       // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
@@ -86,7 +87,6 @@ private[kafka010] class InternalKafkaConsumer(
       // - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
       // - Fetched something but all of them are not invisible. This is a valid case and let the
       //   caller handles this.
-      val range = getAvailableOffsetRange()
       if (offset < range.earliest || offset >= range.latest) {
         throw new OffsetOutOfRangeException(
           Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
@@ -144,14 +144,17 @@ private[kafka010] class InternalKafkaConsumer(
 private[consumer] case class FetchedData(
     private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
     private var _nextOffsetInFetchedData: Long,
-    private var _offsetAfterPoll: Long) {
+    private var _offsetAfterPoll: Long,
+    private var _availableOffsetRange: AvailableOffsetRange) {
 
   def withNewPoll(
       records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
-      offsetAfterPoll: Long): FetchedData = {
+      offsetAfterPoll: Long,
+      availableOffsetRange: AvailableOffsetRange): FetchedData = {
     this._records = records
     this._nextOffsetInFetchedData = UNKNOWN_OFFSET
     this._offsetAfterPoll = offsetAfterPoll
+    this._availableOffsetRange = availableOffsetRange
     this
   }
 
@@ -178,6 +181,7 @@ private[consumer] case class FetchedData(
     _records = ju.Collections.emptyListIterator()
     _nextOffsetInFetchedData = UNKNOWN_OFFSET
     _offsetAfterPoll = UNKNOWN_OFFSET
+    _availableOffsetRange = AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET)
   }
 
   /**
@@ -190,6 +194,13 @@ private[consumer] case class FetchedData(
    * Returns the next offset to poll after draining the pre-fetched records.
    */
   def offsetAfterPoll: Long = _offsetAfterPoll
+
+  /**
+   * Returns the tuple of earliest and latest offsets that is the available offset range when
+   * polling the records.
+   */
+  def availableOffsetRange: (Long, Long) =
+    (_availableOffsetRange.earliest, _availableOffsetRange.latest)
 }
 
 /**
@@ -469,8 +480,8 @@ private[kafka010] class KafkaDataConsumer(
       // In general, Kafka uses the specified offset as the start point, and tries to fetch the next
       // available offset. Hence we need to handle offset mismatch.
       if (record.offset > offset) {
-        val range = consumer.getAvailableOffsetRange()
-        if (range.earliest <= offset) {
+        val (earliestOffset, _) = fetchedData.availableOffsetRange
+        if (earliestOffset <= offset) {
           // `offset` is still valid but the corresponding message is invisible. We should skip it
           // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
           // `fetchRecord` can just return `record` directly.
@@ -519,8 +530,8 @@ private[kafka010] class KafkaDataConsumer(
       fetchedData: FetchedData,
       offset: Long,
       pollTimeoutMs: Long): Unit = {
-    val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
-    fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+    val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs)
+    fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range)
   }
 
   private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
index 23bab5c..09d50ef0 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
-import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.ManualClock
 
@@ -69,7 +69,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
 
     dataList.map { case (_, data) =>
-      data.withNewPoll(testRecords(0, 5).listIterator, 5)
+      data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
     }
 
     dataList.foreach { case (key, data) =>
@@ -91,7 +91,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
 
     val data = dataPool.acquire(cacheKey, 0)
-    data.withNewPoll(testRecords(0, 5).listIterator, 5)
+    data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
 
     (0 to 3).foreach { _ => data.next() }
 
@@ -130,14 +130,14 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assert(getCache(dataPool)(cacheKey)(1).inUse)
 
     // reading from task 1
-    dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
+    dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
 
     (0 to 3).foreach { _ => dataFromTask1.next() }
 
     dataPool.release(cacheKey, dataFromTask1)
 
     // reading from task 2
-    dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
+    dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30, AvailableOffsetRange(0, 29))
 
     (0 to 5).foreach { _ => dataFromTask2.next() }
 
@@ -189,7 +189,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
     assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)
 
     dataList.map { case (_, data) =>
-      data.withNewPoll(testRecords(0, 5).listIterator, 5)
+      data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
     }
 
     val dataToEvict = dataList.take(3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org