You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/03 00:51:09 UTC

[kafka] branch 2.2 updated: KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8d6e2a2  KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
8d6e2a2 is described below

commit 8d6e2a2aac9231deba1a6c17c0ee530c275d13bf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 2 17:46:01 2019 -0700

    KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
    
    During a partial message format upgrade, it is possible for the message format to flap between new and old versions. If we detect that data appended to the log is on an old format, we can clear the leader epoch cache so that we revert to truncation by high watermark. Once the upgrade completes and all replicas are on the same format, we will append to the epoch cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades through configuration.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 87179db..df71560 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -909,8 +909,17 @@ class Log(@volatile var dir: File,
 
         // update the epoch cache with the epoch stamped onto the message by the leader
         validRecords.batches.asScala.foreach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
             maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+          } else {
+            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
+            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
+            // to truncation by high watermark after the next leader election.
+            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
+              cache.clearAndFlush()
+            }
+          }
         }
 
         // check messages set size may be exceed config.segmentSize
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f0604d5..79a462c 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2176,6 +2176,19 @@ class LogTest {
   }
 
   @Test
+  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
+
+    log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
+      baseOffset = 1L,
+      magicValue = RecordVersion.V1.value))
+    assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)