You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/12/21 18:41:56 UTC

[spark] branch master updated: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e76d66  [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
8e76d66 is described below

commit 8e76d6621aaddb8b73443b14ea2c6eebe9089893
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Fri Dec 21 10:41:25 2018 -0800

    [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
    
    ## What changes were proposed in this pull request?
    
    Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data.
    
    As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`.
    
    In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times.
    
    ## How was this patch tested?
    
    Jenkins
    
    Closes #23324 from zsxwing/SPARK-26267.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../sql/kafka010/KafkaContinuousReadSupport.scala  |  4 +-
 .../sql/kafka010/KafkaMicroBatchReadSupport.scala  | 19 +++--
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  2 +
 .../spark/sql/kafka010/KafkaOffsetReader.scala     | 80 ++++++++++++++++++++--
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  5 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 48 +++++++++++++
 6 files changed, 145 insertions(+), 13 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 1753a28..02dfb9c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
   override def initialOffset(): Offset = {
     val offsets = initialOffsets match {
       case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-      case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+      case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
       case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
     }
     logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
 
   override def needsReconfiguration(config: ScanConfig): Boolean = {
     val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-    offsetReader.fetchLatestOffsets().keySet != knownPartitions
+    offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index bb4de67..b4f042e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 
   override def latestOffset(start: Offset): Offset = {
     val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-    val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+    val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
     endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
       rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
     }.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
     }.toSeq
     logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+    val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+    val untilOffsets = endPartitionOffsets
+    untilOffsets.foreach { case (tp, untilOffset) =>
+      fromOffsets.get(tp).foreach { fromOffset =>
+        if (untilOffset < fromOffset) {
+          reportDataLoss(s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed")
+        }
+      }
+    }
+
     // Calculate offset ranges
     val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-      untilOffsets = endPartitionOffsets,
+      fromOffsets = fromOffsets,
+      untilOffsets = untilOffsets,
       executorLocations = getSortedExecutorList())
 
     // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
@@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
         case EarliestOffsetRangeLimit =>
           KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
         case LatestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) =>
           kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
       }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c7..6008794 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
    * the read tasks of the skewed partitions to multiple Spark tasks.
    * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
    * depending on rounding errors or Kafka partitions that didn't receive any new data.
+   *
+   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
    */
   def getRanges(
       fromOffsets: PartitionOffsetMap,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 8206669..fc443d2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
 import java.util.concurrent.{Executors, ThreadFactory}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
         // Poll to get the latest assigned partitions
         consumer.poll(0)
         val partitions = consumer.assignment()
+
+        // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+        // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+        // `poll(0)` may reset offsets that should have been set by another request.
+        partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
         consumer.pause(partitions)
         assert(partitions.asScala == partitionOffsets.keySet,
           "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
   /**
    * Fetch the latest offsets for the topic partitions that are indicated
    * in the [[ConsumerStrategy]].
+   *
+   * Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
+   * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
+   * `poll` to wait until the potential offset request triggered by `poll(0)` is done.
+   *
+   * In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
+   * latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
+   * than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
+   * a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
+   * distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
    */
-  def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+  def fetchLatestOffsets(
+      knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
     withRetriesWithoutInterrupt {
       // Poll to get the latest assigned partitions
       consumer.poll(0)
       val partitions = consumer.assignment()
+
+      // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+      // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+      // `poll(0)` may reset offsets that should have been set by another request.
+      partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
       consumer.pause(partitions)
       logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
 
-      consumer.seekToEnd(partitions)
-      val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
-      logDebug(s"Got latest offsets for partition : $partitionOffsets")
-      partitionOffsets
+      if (knownOffsets.isEmpty) {
+        consumer.seekToEnd(partitions)
+        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+      } else {
+        var partitionOffsets: PartitionOffsetMap = Map.empty
+
+        /**
+         * Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
+         * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
+         */
+        def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
+          var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+          partitionOffsets.foreach { case (tp, offset) =>
+            knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+              if (knownOffset > offset) {
+                val incorrectOffset = (tp, knownOffset, offset)
+                incorrectOffsets += incorrectOffset
+              }
+            })
+          }
+          incorrectOffsets
+        }
+
+        // Retry to fetch latest offsets when detecting incorrect offsets. We don't use
+        // `withRetriesWithoutInterrupt` to retry because:
+        //
+        // - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
+        //    consumer has a much bigger chance to hit KAFKA-7703.
+        // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+        var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+        var attempt = 0
+        do {
+          consumer.seekToEnd(partitions)
+          partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
+          attempt += 1
+
+          incorrectOffsets = findIncorrectOffsets()
+          if (incorrectOffsets.nonEmpty) {
+            logWarning("Found incorrect offsets in some partitions " +
+              s"(partition, previous offset, fetched offset): $incorrectOffsets")
+            if (attempt < maxOffsetFetchAttempts) {
+              logWarning("Retrying to fetch latest offsets because of incorrect offsets")
+              Thread.sleep(offsetFetchAttemptIntervalMs)
+            }
+          }
+        } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
+
+        logDebug(s"Got latest offsets for partition : $partitionOffsets")
+        partitionOffsets
+      }
     }
   }
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 66ec7e0..d65b3ce 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
     metadataLog.get(0).getOrElse {
       val offsets = startingOffsets match {
         case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+        case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
       }
       metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
-    val latest = kafkaReader.fetchLatestOffsets()
+    val latest = kafkaReader.fetchLatestOffsets(
+      currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
     val offsets = maxOffsetsPerTrigger match {
       case None =>
         latest
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5ee7699..61cbb32 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -329,6 +329,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     )
   }
 
+  test("subscribe topic by pattern with topic recreation between batches") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-good"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, Array("1", "3"))
+    testUtils.createTopic(topic2, partitions = 1)
+    testUtils.sendMessages(topic2, Array("2", "4"))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
+      .option("startingOffsets", "earliest")
+      .option("subscribePattern", s"$topicPrefix-.*")
+
+    val ds = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+      .map(kv => kv._2.toInt)
+
+    testStream(ds)(
+      StartStream(),
+      AssertOnQuery { q =>
+        q.processAllAvailable()
+        true
+      },
+      CheckAnswer(1, 2, 3, 4),
+      // Restart the stream in this test to make the test stable. When recreating a topic when a
+      // consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
+      // has seen it.
+      StopStream,
+      // Recreate `topic2` and wait until it's available
+      WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
+        testUtils.deleteTopic(topic2)
+        testUtils.createTopic(topic2)
+        testUtils.sendMessages(topic2, Array("6"))
+      },
+      StartStream(),
+      ExpectFailure[IllegalStateException](e => {
+        // The offset of `topic2` should be changed from 2 to 1
+        assert(e.getMessage.contains("was changed from 2 to 1"))
+      })
+    )
+  }
+
   test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
     withTempDir { metadataPath =>
       val topic = "kafka-initial-offset-current"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org