You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/08 00:53:15 UTC

[spark] branch branch-2.4 updated: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new faa4c28  [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)
faa4c28 is described below

commit faa4c2823b69c1643d7678ee1cb0b7295c611334
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Mon Jan 7 16:53:07 2019 -0800

    [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)
    
    ## What changes were proposed in this pull request?
    
    Backport #23324 to branch-2.4.
    
    ## How was this patch tested?
    
    Jenkins
    
    Closes #23365 from zsxwing/SPARK-26267-2.4.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../spark/sql/kafka010/KafkaContinuousReader.scala |  4 +-
 .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 20 ++++--
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  2 +
 .../spark/sql/kafka010/KafkaOffsetReader.scala     | 80 ++++++++++++++++++++--
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  5 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 48 +++++++++++++
 6 files changed, 146 insertions(+), 13 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8ce56a2..561d501 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -73,7 +73,7 @@ class KafkaContinuousReader(
     offset = start.orElse {
       val offsets = initialOffsets match {
         case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+        case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
       }
       logInfo(s"Initial offsets: $offsets")
@@ -128,7 +128,7 @@ class KafkaContinuousReader(
   }
 
   override def needsReconfiguration(): Boolean = {
-    knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
+    knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8cc989f..b6c8035 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader(
     endPartitionOffsets = Option(end.orElse(null))
         .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
         .getOrElse {
-          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+          val latestPartitionOffsets =
+            kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
           maxOffsetsPerTrigger.map { maxOffsets =>
             rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
           }.getOrElse {
@@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader(
     }.toSeq
     logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+    val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+    val untilOffsets = endPartitionOffsets
+    untilOffsets.foreach { case (tp, untilOffset) =>
+      fromOffsets.get(tp).foreach { fromOffset =>
+        if (untilOffset < fromOffset) {
+          reportDataLoss(s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed")
+        }
+      }
+    }
+
     // Calculate offset ranges
     val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-      untilOffsets = endPartitionOffsets,
+      fromOffsets = fromOffsets,
+      untilOffsets = untilOffsets,
       executorLocations = getSortedExecutorList())
 
     // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
@@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader(
         case EarliestOffsetRangeLimit =>
           KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
         case LatestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) =>
           kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
       }
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c7..6008794 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
    * the read tasks of the skewed partitions to multiple Spark tasks.
    * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
    * depending on rounding errors or Kafka partitions that didn't receive any new data.
+   *
+   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
    */
   def getRanges(
       fromOffsets: PartitionOffsetMap,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 8206669..fc443d2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
 import java.util.concurrent.{Executors, ThreadFactory}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
         // Poll to get the latest assigned partitions
         consumer.poll(0)
         val partitions = consumer.assignment()
+
+        // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+        // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+        // `poll(0)` may reset offsets that should have been set by another request.
+        partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
         consumer.pause(partitions)
         assert(partitions.asScala == partitionOffsets.keySet,
           "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
   /**
    * Fetch the latest offsets for the topic partitions that are indicated
    * in the [[ConsumerStrategy]].
+   *
+   * Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
+   * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
+   * `poll` to wait until the potential offset request triggered by `poll(0)` is done.
+   *
+   * In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
+   * latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
+   * than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
+   * a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
+   * distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
    */
-  def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+  def fetchLatestOffsets(
+      knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
     withRetriesWithoutInterrupt {
       // Poll to get the latest assigned partitions
       consumer.poll(0)
       val partitions = consumer.assignment()
+
+      // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+      // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+      // `poll(0)` may reset offsets that should have been set by another request.
+      partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
       consumer.pause(partitions)
       logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
 
-      consumer.seekToEnd(partitions)
-      val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
-      logDebug(s"Got latest offsets for partition : $partitionOffsets")
-      partitionOffsets
+      if (knownOffsets.isEmpty) {
+        consumer.seekToEnd(partitions)
+        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+      } else {
+        var partitionOffsets: PartitionOffsetMap = Map.empty
+
+        /**
+         * Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
+         * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
+         */
+        def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
+          var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+          partitionOffsets.foreach { case (tp, offset) =>
+            knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+              if (knownOffset > offset) {
+                val incorrectOffset = (tp, knownOffset, offset)
+                incorrectOffsets += incorrectOffset
+              }
+            })
+          }
+          incorrectOffsets
+        }
+
+        // Retry to fetch latest offsets when detecting incorrect offsets. We don't use
+        // `withRetriesWithoutInterrupt` to retry because:
+        //
+        // - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
+        //    consumer has a much bigger chance to hit KAFKA-7703.
+        // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+        var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+        var attempt = 0
+        do {
+          consumer.seekToEnd(partitions)
+          partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
+          attempt += 1
+
+          incorrectOffsets = findIncorrectOffsets()
+          if (incorrectOffsets.nonEmpty) {
+            logWarning("Found incorrect offsets in some partitions " +
+              s"(partition, previous offset, fetched offset): $incorrectOffsets")
+            if (attempt < maxOffsetFetchAttempts) {
+              logWarning("Retrying to fetch latest offsets because of incorrect offsets")
+              Thread.sleep(offsetFetchAttemptIntervalMs)
+            }
+          }
+        } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
+
+        logDebug(s"Got latest offsets for partition : $partitionOffsets")
+        partitionOffsets
+      }
     }
   }
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 66ec7e0..d65b3ce 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
     metadataLog.get(0).getOrElse {
       val offsets = startingOffsets match {
         case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+        case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
         case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
       }
       metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
 
-    val latest = kafkaReader.fetchLatestOffsets()
+    val latest = kafkaReader.fetchLatestOffsets(
+      currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
     val offsets = maxOffsetsPerTrigger match {
       case None =>
         latest
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index d89e45e..5f05833 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -327,6 +327,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     )
   }
 
+  test("subscribe topic by pattern with topic recreation between batches") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-good"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, Array("1", "3"))
+    testUtils.createTopic(topic2, partitions = 1)
+    testUtils.sendMessages(topic2, Array("2", "4"))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
+      .option("startingOffsets", "earliest")
+      .option("subscribePattern", s"$topicPrefix-.*")
+
+    val ds = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+      .map(kv => kv._2.toInt)
+
+    testStream(ds)(
+      StartStream(),
+      AssertOnQuery { q =>
+        q.processAllAvailable()
+        true
+      },
+      CheckAnswer(1, 2, 3, 4),
+      // Restart the stream in this test to make the test stable. When recreating a topic when a
+      // consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
+      // has seen it.
+      StopStream,
+      // Recreate `topic2` and wait until it's available
+      WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
+        testUtils.deleteTopic(topic2)
+        testUtils.createTopic(topic2)
+        testUtils.sendMessages(topic2, Array("6"))
+      },
+      StartStream(),
+      ExpectFailure[IllegalStateException](e => {
+        // The offset of `topic2` should be changed from 2 to 1
+        assert(e.getMessage.contains("was changed from 2 to 1"))
+      })
+    )
+  }
+
   test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
     withTempDir { metadataPath =>
       val topic = "kafka-initial-offset-current"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org