You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 19:50:09 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11345: Allow empty last segment to have missing offset index during recovery

hachikuji commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r760521586



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -443,7 +443,7 @@ private void onBecomeLeader(long currentTimeMs) {
     private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
         // We update the end offset before flushing so that parked fetches can return sooner.
         updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
-        log.flush();
+        log.flush(false);

Review comment:
       This is probably ok. The way that kraft manages log retention is a bit different from normal partitions. In general, segments are only deleted once we have a snapshot which covers all of their data. That should mean that there is no risk today of losing the log end offset even if an unflushed empty segment is lost. In other words, even if all of the segments are lost, we should still have a snapshot to derive the log end offset from.
   
   cc @jsancio 

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,28 +1498,44 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.takeSnapshot()
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+    scheduler.schedule("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset))
     newSegment
   }
 
   /**
    * Flush all local log segments
+   *
+   * @param forceFlushActiveSegment should be true during a clean shutdown, and false otherwise. The reason is that
+   * we have to pass logEndOffset + 1 to the `localLog.flush(offset: Long): Unit` function to flush empty
+   * active segments, which is important to make sure we persist the active segment file during shutdown, particularly
+   * when it's empty.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(forceFlushActiveSegment: Boolean): Unit = flush(logEndOffset, forceFlushActiveSegment)

Review comment:
       The addition of this flag seems to be an optimization so that we are not forced to flush newly created (empty) segments. Do you have a sense for how valuable this optimization is? The additional noise it adds to the log API is a bit unfortunate, so it would be helpful to understand what we're getting in return.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org