You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/08 00:53:15 UTC
[spark] branch branch-2.4 updated: [SPARK-26267][SS] Retry when
detecting incorrect offsets from Kafka (2.4)
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new faa4c28 [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)
faa4c28 is described below
commit faa4c2823b69c1643d7678ee1cb0b7295c611334
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Mon Jan 7 16:53:07 2019 -0800
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4)
## What changes were proposed in this pull request?
Backport #23324 to branch-2.4.
## How was this patch tested?
Jenkins
Closes #23365 from zsxwing/SPARK-26267-2.4.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../spark/sql/kafka010/KafkaContinuousReader.scala | 4 +-
.../spark/sql/kafka010/KafkaMicroBatchReader.scala | 20 ++++--
.../sql/kafka010/KafkaOffsetRangeCalculator.scala | 2 +
.../spark/sql/kafka010/KafkaOffsetReader.scala | 80 ++++++++++++++++++++--
.../apache/spark/sql/kafka010/KafkaSource.scala | 5 +-
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 48 +++++++++++++
6 files changed, 146 insertions(+), 13 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8ce56a2..561d501 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -73,7 +73,7 @@ class KafkaContinuousReader(
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+ case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
logInfo(s"Initial offsets: $offsets")
@@ -128,7 +128,7 @@ class KafkaContinuousReader(
}
override def needsReconfiguration(): Boolean = {
- knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions
+ knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
}
override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
index 8cc989f..b6c8035 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
@@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader(
endPartitionOffsets = Option(end.orElse(null))
.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
.getOrElse {
- val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+ val latestPartitionOffsets =
+ kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
@@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader(
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+ val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+ val untilOffsets = endPartitionOffsets
+ untilOffsets.foreach { case (tp, untilOffset) =>
+ fromOffsets.get(tp).foreach { fromOffset =>
+ if (untilOffset < fromOffset) {
+ reportDataLoss(s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed")
+ }
+ }
+ }
+
// Calculate offset ranges
val offsetRanges = rangeCalculator.getRanges(
- fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
- untilOffsets = endPartitionOffsets,
+ fromOffsets = fromOffsets,
+ untilOffsets = untilOffsets,
executorLocations = getSortedExecutorList())
// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
@@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader(
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit =>
- KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+ KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index fb209c7..6008794 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
* the read tasks of the skewed partitions to multiple Spark tasks.
* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
* depending on rounding errors or Kafka partitions that didn't receive any new data.
+ *
+ * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
*/
def getRanges(
fromOffsets: PartitionOffsetMap,
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 8206669..fc443d2 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -21,6 +21,7 @@ import java.{util => ju}
import java.util.concurrent.{Executors, ThreadFactory}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
@@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
+
+ // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+ // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+ // `poll(0)` may reset offsets that should have been set by another request.
+ partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
/**
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
+ *
+ * Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
+ * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
+ * `poll` to wait until the potential offset request triggered by `poll(0)` is done.
+ *
+ * In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
+ * latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
+ * than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
+ * a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
+ * distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
*/
- def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+ def fetchLatestOffsets(
+ knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
+
+ // Call `position` to wait until the potential offset request triggered by `poll(0)` is
+ // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
+ // `poll(0)` may reset offsets that should have been set by another request.
+ partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
+
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
- consumer.seekToEnd(partitions)
- val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
- logDebug(s"Got latest offsets for partition : $partitionOffsets")
- partitionOffsets
+ if (knownOffsets.isEmpty) {
+ consumer.seekToEnd(partitions)
+ partitions.asScala.map(p => p -> consumer.position(p)).toMap
+ } else {
+ var partitionOffsets: PartitionOffsetMap = Map.empty
+
+ /**
+ * Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
+ * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
+ */
+ def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
+ var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+ partitionOffsets.foreach { case (tp, offset) =>
+ knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+ if (knownOffset > offset) {
+ val incorrectOffset = (tp, knownOffset, offset)
+ incorrectOffsets += incorrectOffset
+ }
+ })
+ }
+ incorrectOffsets
+ }
+
+ // Retry to fetch latest offsets when detecting incorrect offsets. We don't use
+ // `withRetriesWithoutInterrupt` to retry because:
+ //
+ // - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
+ // consumer has a much bigger chance to hit KAFKA-7703.
+ // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+ var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+ var attempt = 0
+ do {
+ consumer.seekToEnd(partitions)
+ partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
+ attempt += 1
+
+ incorrectOffsets = findIncorrectOffsets()
+ if (incorrectOffsets.nonEmpty) {
+ logWarning("Found incorrect offsets in some partitions " +
+ s"(partition, previous offset, fetched offset): $incorrectOffsets")
+ if (attempt < maxOffsetFetchAttempts) {
+ logWarning("Retrying to fetch latest offsets because of incorrect offsets")
+ Thread.sleep(offsetFetchAttemptIntervalMs)
+ }
+ }
+ } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
+
+ logDebug(s"Got latest offsets for partition : $partitionOffsets")
+ partitionOffsets
+ }
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 66ec7e0..d65b3ce 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
- case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+ case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
@@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
- val latest = kafkaReader.fetchLatestOffsets()
+ val latest = kafkaReader.fetchLatestOffsets(
+ currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
latest
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index d89e45e..5f05833 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -327,6 +327,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}
+ test("subscribe topic by pattern with topic recreation between batches") {
+ val topicPrefix = newTopic()
+ val topic = topicPrefix + "-good"
+ val topic2 = topicPrefix + "-bad"
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, Array("1", "3"))
+ testUtils.createTopic(topic2, partitions = 1)
+ testUtils.sendMessages(topic2, Array("2", "4"))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
+ .option("startingOffsets", "earliest")
+ .option("subscribePattern", s"$topicPrefix-.*")
+
+ val ds = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ .map(kv => kv._2.toInt)
+
+ testStream(ds)(
+ StartStream(),
+ AssertOnQuery { q =>
+ q.processAllAvailable()
+ true
+ },
+ CheckAnswer(1, 2, 3, 4),
+ // Restart the stream in this test to make the test stable. When recreating a topic when a
+ // consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
+ // has seen it.
+ StopStream,
+ // Recreate `topic2` and wait until it's available
+ WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
+ testUtils.deleteTopic(topic2)
+ testUtils.createTopic(topic2)
+ testUtils.sendMessages(topic2, Array("6"))
+ },
+ StartStream(),
+ ExpectFailure[IllegalStateException](e => {
+ // The offset of `topic2` should be changed from 2 to 1
+ assert(e.getMessage.contains("was changed from 2 to 1"))
+ })
+ )
+ }
+
test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
withTempDir { metadataPath =>
val topic = "kafka-initial-offset-current"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org