You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/03 01:08:50 UTC

[kafka] branch 2.1 updated: KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 61dac29  KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
61dac29 is described below

commit 61dac2965f628e6dfc330181f534388a479d954f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 2 17:46:01 2019 -0700

    KAFKA-7601; Clear leader epoch cache on downgraded format in append (#6568)
    
    During a partial message format upgrade, it is possible for the message format to flap between new and old versions. If we detect that data appended to the log is on an old format, we can clear the leader epoch cache so that we revert to truncation by high watermark. Once the upgrade completes and all replicas are on the same format, we will append to the epoch cache as usual. Note this is related to KAFKA-7897, which handles message format downgrades through configuration.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index af0f343..9811a0e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -907,8 +907,17 @@ class Log(@volatile var dir: File,
 
         // update the epoch cache with the epoch stamped onto the message by the leader
         validRecords.batches.asScala.foreach { batch =>
-          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
             maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
+          } else {
+            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
+            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
+            // to truncation by high watermark after the next leader election.
+            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
+              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
+              cache.clearAndFlush()
+            }
+          }
         }
 
         // check messages set size may be exceed config.segmentSize
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 2af97fa..cbc83a9 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2135,6 +2135,19 @@ class LogTest {
   }
 
   @Test
+  def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
+    assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch))
+
+    log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())),
+      baseOffset = 1L,
+      magicValue = RecordVersion.V1.value))
+    assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch))
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)