You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/06 00:42:54 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

junrao commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1063005106


##########
core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala:
##########
@@ -243,282 +245,287 @@ class LeaderEpochFileCacheTest {
 
     //Given
     val cache = new LeaderEpochFileCache(tp, checkpoint)
-    cache.assign(epoch = 2, startOffset = 6)
+    cache.assign(2, 6)
 
     //When
     val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath))
     val cache2 = new LeaderEpochFileCache(tp, checkpoint2)
 
     //Then
     assertEquals(1, cache2.epochEntries.size)
-    assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0))
+    assertEquals(new EpochEntry(2, 6), cache2.epochEntries.get(0))
   }
 
   @Test
   def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = {
     //Given
-    cache.assign(epoch = 1, startOffset = 5);
+    cache.assign(1, 5);
     var logEndOffset = 6
-    cache.assign(epoch = 2, startOffset = 6);
+    cache.assign(2, 6);
     logEndOffset = 7
 
     //When we update an epoch in the past with a different offset, the log has already reached
     //an inconsistent state. Our options are either to raise an error, ignore the new append,
     //or truncate the cached epochs to the point of conflict. We take this latter approach in
     //order to guarantee that epochs and offsets in the cache increase monotonically, which makes
     //the search logic simpler to reason about.
-    cache.assign(epoch = 1, startOffset = 7);
+    cache.assign(1, 7);
     logEndOffset = 8
 
     //Then later epochs will be removed
-    assertEquals(Some(1), cache.latestEpoch)
+    assertEquals(Optional.of(1), cache.latestEpoch)
 
     //Then end offset for epoch 1 will have changed
-    assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset))
+    assertEquals((1, 8), toTuple(cache.endOffsetFor(1, logEndOffset)))
 
     //Then end offset for epoch 2 is now undefined
-    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2, logEndOffset))
-    assertEquals(EpochEntry(1, 7), cache.epochEntries(0))
+    assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(2, logEndOffset)))
+    assertEquals(new EpochEntry(1, 7), cache.epochEntries.get(0))
+  }
+
+  def toTuple[K, V](entry: java.util.Map.Entry[K, V]): (K, V) = {

Review Comment:
   Could this be private?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1294,36 +1293,53 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         // The first cached epoch usually corresponds to the log start offset, but we have to verify this since
         // it may not be true following a message format version bump as the epoch will not be available for
         // log entries written in the older format.
-        val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
-        val epochOpt = earliestEpochEntry match {
-          case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch)
-          case _ => Optional.empty[Integer]()
+        val earliestEpochEntry = leaderEpochCache match {
+          case Some(cache) => cache.earliestEntry()
+          case None => Optional.empty[EpochEntry]()
         }
+
+        val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) {
+          Optional.of[Integer](earliestEpochEntry.get().epoch)
+        } else Optional.empty[Integer]()
+
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
       } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
         val curLocalLogStartOffset = localLogStartOffset()
-        val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache =>
-          cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry))
-        val epochOpt = earliestLocalLogEpochEntry match {
-          case Some(entry) if entry.startOffset <= curLocalLogStartOffset => Optional.of[Integer](entry.epoch)
-          case _ => Optional.empty[Integer]()
+
+        val earliestLocalLogEpochEntry: Optional[EpochEntry] = leaderEpochCache match {
+          case Some(cache) =>
+            val value = cache.epochForOffset(curLocalLogStartOffset)
+            if (value.isPresent) cache.epochEntry(value.get) else Optional.empty[EpochEntry]()
+          case None => Optional.empty[EpochEntry]()
         }
+
+        val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
+          Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
+        else Optional.empty[Integer]()
+
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt))
       } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
-        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
-        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
-        Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+

Review Comment:
   extra newline



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   Yes, the proposed package names sound good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org