You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/01 00:37:27 UTC

[GitHub] [kafka] junrao commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605293308



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {

Review comment:
       Ideally, we want to flush the parent dir first before setting needFlush to false.

##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -490,11 +490,14 @@ class LogSegment private[log] (val log: FileRecords,
    * Change the suffix for the index and log files for this log segment
    * IOException from this method should be handled by the caller
    */
-  def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
+  def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = {

Review comment:
       Hmm, we need to pass needFlushParentDir to each of log.renameTo and index.renameTo to disable flushing, right?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +442,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable);

Review comment:
       This is ok but not the most accurate. We only need to set the flush flag to true if it's mutable and log recovery is needed.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {
+            Utils.flushParentDir(file.toPath());
+        }
+    }
+
+    /**
+     * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
+     */
+    public void flushParentDir() throws IOException {
+        needFlushParentDir.set(false);

Review comment:
       Ideally, we want to flush the parent dir first before setting needFlush to false.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -848,6 +849,7 @@ class LogManager(logDirs: Seq[File],
       val dir = new File(logDirPath, logDirName)
       try {
         Files.createDirectories(dir.toPath)
+        Utils.flushParentDir(dir.toPath)

Review comment:
       I am wondering if we should flush the parent dir when we delete a log too. This is not strictly required for every delete. So one option is to flush every parent dir when closing the LogManager.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org