You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/26 00:37:05 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9219: KAFKA-10432: LeaderEpochCache is incorrectly recovered for leader epoch 0

hachikuji commented on a change in pull request #9219:
URL: https://github.com/apache/kafka/pull/9219#discussion_r476915997



##########
File path: core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
##########
@@ -367,6 +371,45 @@ class LogSegmentTest {
     assertEquals(100L, abortedTxn.lastStableOffset)
   }
 
+  /**
+   * Create a segment with some data, then recover the segment.
+   * The epoch cache entries should reflect the segment.
+   */
+  @Test
+  def testRecoveryRebuildsEpochCache(): Unit = {
+    val seg = createSegment(0)
+
+    val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+      private var epochs = Seq.empty[EpochEntry]
+
+      override def write(epochs: Seq[EpochEntry]): Unit = {
+        this.epochs = epochs.toVector
+      }
+
+      override def read(): Seq[EpochEntry] = this.epochs
+    }
+
+    val cache = new LeaderEpochFileCache(topicPartition, () => seg.readNextOffset, checkpoint)
+    seg.append(largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE, 0,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 106L, MemoryRecords.withRecords(106L, CompressionType.NONE, 1,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 109L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 108L, MemoryRecords.withRecords(108L, CompressionType.NONE, 1,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.append(largestOffset = 111L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
+      shallowOffsetOfMaxTimestamp = 110, MemoryRecords.withRecords(110L, CompressionType.NONE, 2,
+        new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+
+    seg.recover(new ProducerStateManager(topicPartition, logDir), Some(cache))
+    assertEquals(ArrayBuffer(EpochEntry(0, 104L), EpochEntry(epoch=1, startOffset=106), EpochEntry(epoch=2, startOffset=110)), cache.epochEntries)

Review comment:
       nit: use arg names consistently?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org