You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/05/31 11:01:44 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13782: Suggest for performance fix: KAFKA-9693 Kafka latency spikes caused by log segment flush on roll - trunk version

divijvaidya commented on code in PR #13782:
URL: https://github.com/apache/kafka/pull/13782#discussion_r1211516603


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -679,9 +688,21 @@ private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entri
         long crc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.limit() - PRODUCER_ENTRIES_OFFSET);
         ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc);
 
-        try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
-            fileChannel.write(buffer);
+        FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        fileChannel.write(buffer);
+        if (scheduler != null) {
+            scheduler.scheduleOnce("flush-snapshot", () -> closeSnapshotFile(fileChannel));
+        } else {
+            closeSnapshotFile(fileChannel);
+        }
+    }
+
+    private static void closeSnapshotFile(FileChannel fileChannel) {
+        try {
             fileChannel.force(true);
+            fileChannel.close();

Review Comment:
   Let's decouple the exception handling of close from the above statement. We want to try releasing resources even if the flush threw an error.
   
   Also, you can use `Utils.closeQuietly` to close the fileChannel. It will not propagate the exception but instead just log a message at `warn` level. This will ensure that the thread is not terminated anywhere else after the exception.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -679,9 +688,21 @@ private static void writeSnapshot(File file, Map<Long, ProducerStateEntry> entri
         long crc = Crc32C.compute(buffer, PRODUCER_ENTRIES_OFFSET, buffer.limit() - PRODUCER_ENTRIES_OFFSET);
         ByteUtils.writeUnsignedInt(buffer, CRC_OFFSET, crc);
 
-        try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
-            fileChannel.write(buffer);
+        FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+        fileChannel.write(buffer);
+        if (scheduler != null) {
+            scheduler.scheduleOnce("flush-snapshot", () -> closeSnapshotFile(fileChannel));

Review Comment:
   suggestion: flush-producer-snapshot



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1498,7 +1498,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    producerStateManager.takeSnapshot(scheduler)

Review Comment:
   I am wondering if we need to increase the default size of background threads since we are adding more responsibility to it. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org