You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/25 23:10:28 UTC

[GitHub] [kafka] ccding opened a new pull request #10405: [KAFKA-3968] fsync the parent directory of a segment file when the file is created

ccding opened a new pull request #10405:
URL: https://github.com/apache/kafka/pull/10405


   Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.
   
   The problem is that following sequence of calls doesn't guarantee file durability:
   
   fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
   write(fd);
   fsync(fd);
   
   If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.
   
   This PR is to flush the directory when flush() is called for the first time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812645668


   Failed tests are unrelated (connect and streams) and passed on my local run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605902522



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +442,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable);

Review comment:
       fixed with passing in `hadCleanShutdown`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605902276



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {
+            Utils.flushParentDir(file.toPath());
+        }
+    }
+
+    /**
+     * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
+     */
+    public void flushParentDir() throws IOException {
+        needFlushParentDir.set(false);

Review comment:
       Same as above.
   
   Set flag first to prevent other threads from calling `flush` concurrently.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {
+            Utils.flushParentDir(file.toPath());
+        }
+    }
+
+    /**
+     * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
+     */
+    public void flushParentDir() throws IOException {
+        needFlushParentDir.set(false);

Review comment:
       Same as above.
   
   Setting flag first to prevent other threads from calling `flush` concurrently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812282452


   Fixed comments from @junrao 
   
   Also addressed the two problems we discussed offline:
   - flush the parent of new segments during its first flush: added the `!fileAlreadyExists` check
   - flush the parent directory after flushing the log file and all index files
   
   Please take a look @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812645668


   Failed tests are unrelated and passed on my local run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r602428120



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -249,6 +266,7 @@ public void renameTo(File f) throws IOException {
         } finally {
             this.file = f;
         }
+        needFlushParentDir.set(true);

Review comment:
       Hmm, this seems problematic. For example, when we do log cleaning, the steps are (1) write cleaned data to a new segment with .clean suffix; (2) flush the new segment; (3) rename the .clean file to .swap; (4) rename .swap to .log. There is no additional flush called after renaming. So, this flag won't trigger the flushing of the parent directory. 
   
   One way is to add a method that explicitly forces the flushing of the parent directory after renaming and add the call after step 4.
   
   Also, it seems that we also need the logic to flush the parent directory of topic-partition. This is needed when new topic partition is added/deleted in a broker or when moving partition across disks in JBOD. The latter has the following steps: (1) copy log segment in directory topic-partition in one disk to directory topic-partition-future in another disk; (2) once the copying is done, rename topic-partition-future to topic-partition. Here, after step (2) it seems that we need the logic to flush the parent directory in both the old and the new disk.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +445,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable && !fileAlreadyExists);

Review comment:
       The condition `mutable && !fileAlreadyExists`doesn't seem complete. When a broker is restarted, all existing log segments are opened with mutable and fileAlreadyExists. However, segments beyond the recovery point may not have been flushed before. When they are flushed, we need to also flush the parent directory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606039455



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -657,17 +668,19 @@ class LogSegment private[log] (val log: FileRecords,
 object LogSegment {
 
   def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
-           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
+           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
+           needsRecovery: Boolean = true): LogSegment = {

Review comment:
       Yeah, I changed this but didn't push. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812801416


   Would the perf impact of this change be more significant with a larger number of partitions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812642793


   @junrao Thanks for the reiview. The previous test was run with default settings in `config/server.properties`, where `log.segment.bytes=1073741824` (1GB). I changed it to `log.segment.bytes=10737418` (~10MB) and re-ran the test. I, again, used 1KB record size.
   
   The result before applying this PR:
   ```
   384286 records sent, 76749.8 records/sec (74.95 MB/sec), 361.0 ms avg latency, 568.0 ms max latency.
   486450 records sent, 96575.3 records/sec (94.31 MB/sec), 318.9 ms avg latency, 346.0 ms max latency.
   476100 records sent, 94802.9 records/sec (92.58 MB/sec), 322.1 ms avg latency, 368.0 ms max latency.
   473010 records sent, 94602.0 records/sec (92.38 MB/sec), 328.8 ms avg latency, 370.0 ms max latency.
   462570 records sent, 92514.0 records/sec (90.35 MB/sec), 329.6 ms avg latency, 363.0 ms max latency.
   462405 records sent, 92481.0 records/sec (90.31 MB/sec), 331.4 ms avg latency, 373.0 ms max latency.
   475485 records sent, 95097.0 records/sec (92.87 MB/sec), 322.9 ms avg latency, 353.0 ms max latency.
   475980 records sent, 95157.9 records/sec (92.93 MB/sec), 322.4 ms avg latency, 380.0 ms max latency.
   476190 records sent, 95238.0 records/sec (93.01 MB/sec), 323.9 ms avg latency, 366.0 ms max latency.
   474345 records sent, 94869.0 records/sec (92.65 MB/sec), 326.8 ms avg latency, 386.0 ms max latency.
   488115 records sent, 96752.2 records/sec (94.48 MB/sec), 314.0 ms avg latency, 344.0 ms max latency.
   485220 records sent, 97044.0 records/sec (94.77 MB/sec), 320.9 ms avg latency, 358.0 ms max latency.
   487740 records sent, 97548.0 records/sec (95.26 MB/sec), 311.4 ms avg latency, 353.0 ms max latency.
   493755 records sent, 98751.0 records/sec (96.44 MB/sec), 313.8 ms avg latency, 348.0 ms max latency.
   
   ```
   
   The result after applying this PR:
   ```
   253786 records sent, 50757.2 records/sec (49.57 MB/sec), 542.9 ms avg latency, 1099.0 ms max latency.
   439665 records sent, 87862.7 records/sec (85.80 MB/sec), 351.7 ms avg latency, 487.0 ms max latency.
   458580 records sent, 91716.0 records/sec (89.57 MB/sec), 337.6 ms avg latency, 417.0 ms max latency.
   477015 records sent, 95403.0 records/sec (93.17 MB/sec), 322.0 ms avg latency, 359.0 ms max latency.
   492705 records sent, 97584.7 records/sec (95.30 MB/sec), 313.8 ms avg latency, 344.0 ms max latency.
   492240 records sent, 98448.0 records/sec (96.14 MB/sec), 314.3 ms avg latency, 358.0 ms max latency.
   495810 records sent, 99162.0 records/sec (96.84 MB/sec), 308.9 ms avg latency, 357.0 ms max latency.
   483675 records sent, 96735.0 records/sec (94.47 MB/sec), 317.1 ms avg latency, 365.0 ms max latency.
   478230 records sent, 95646.0 records/sec (93.40 MB/sec), 319.8 ms avg latency, 365.0 ms max latency.
   482295 records sent, 95560.7 records/sec (93.32 MB/sec), 321.1 ms avg latency, 427.0 ms max latency.
   491430 records sent, 98286.0 records/sec (95.98 MB/sec), 315.5 ms avg latency, 373.0 ms max latency.
   489615 records sent, 97923.0 records/sec (95.63 MB/sec), 314.0 ms avg latency, 367.0 ms max latency.
   405855 records sent, 81154.8 records/sec (79.25 MB/sec), 374.0 ms avg latency, 485.0 ms max latency.
   455400 records sent, 91061.8 records/sec (88.93 MB/sec), 338.5 ms avg latency, 416.0 ms max latency.
   470325 records sent, 94065.0 records/sec (91.86 MB/sec), 327.5 ms avg latency, 424.0 ms max latency.
   444465 records sent, 88893.0 records/sec (86.81 MB/sec), 343.3 ms avg latency, 426.0 ms max latency.
   410010 records sent, 81789.3 records/sec (79.87 MB/sec), 374.5 ms avg latency, 485.0 ms max latency.
   460455 records sent, 92091.0 records/sec (89.93 MB/sec), 338.6 ms avg latency, 411.0 ms max latency.
   ```
   
   We can clearly see some differences. Is this a concern?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812744839


   > It's interesting that the absolute throughput dropped significantly with 10MB segments compared with 1GB segment.
   
   I am thinking it is the cost of the extra flush. We have one extra flush per segment, which is 1 extra flush per 10,000 records for 10MB segments and 1KB records. If it were 1GB segments, there is 1 extra flush per 100,000 records: 1/100 of the cost.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606039773



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
+  /* whether or not we need to flush the parent dir during flush */

Review comment:
       Not really. We changed the value of `atomicNeedsFlushParentDir` after the first flush. The value of `needsFlushParentDir` in the construction function is for during the first flush. Do you have any suggestions on how to comment on them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605903207



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -848,6 +849,7 @@ class LogManager(logDirs: Seq[File],
       val dir = new File(logDirPath, logDirName)
       try {
         Files.createDirectories(dir.toPath)
+        Utils.flushParentDir(dir.toPath)

Review comment:
       Per our offline discussion, we decided not to flush at deletion. Deletions are async and can be retried after rebooting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605955876



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -320,7 +320,7 @@ class Log(@volatile private var _dir: File,
     initializeLeaderEpochCache()
     initializePartitionMetadata()
 
-    val nextOffset = loadSegments()
+    val nextOffset = loadSegments(hadCleanShutdown)

Review comment:
       There is no need to pass hadCleanShutdown in since it's already accessible from loadSegments().

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -424,21 +439,22 @@ public static FileRecords open(File file,
                                    boolean mutable,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
+                                   boolean preallocate,
+                                   boolean hadCleanShutdown) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable && !hadCleanShutdown);
     }
 
     public static FileRecords open(File file,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
-        return open(file, true, fileAlreadyExists, initFileSize, preallocate);
+                                   boolean preallocate, boolean hadCleanShutdown) throws IOException {

Review comment:
       It's probably more intuitive to change hadCleanShutdown to needsRecovery and pass in the negation of the flag. Then, the default value of false makes more sense.
   
   Also, could we put the change in a separate line to match the existing format?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606040867



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -424,21 +439,22 @@ public static FileRecords open(File file,
                                    boolean mutable,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
+                                   boolean preallocate,
+                                   boolean hadCleanShutdown) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable && !hadCleanShutdown);
     }
 
     public static FileRecords open(File file,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
-        return open(file, true, fileAlreadyExists, initFileSize, preallocate);
+                                   boolean preallocate, boolean hadCleanShutdown) throws IOException {

Review comment:
       Fixed.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -320,7 +320,7 @@ class Log(@volatile private var _dir: File,
     initializeLeaderEpochCache()
     initializePartitionMetadata()
 
-    val nextOffset = loadSegments()
+    val nextOffset = loadSegments(hadCleanShutdown)

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606040117



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -433,7 +440,8 @@ public static FileRecords open(File file,
     public static FileRecords open(File file,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
+                                   boolean preallocate,
+                                   boolean needsRecovery) throws IOException {

Review comment:
       True.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812744102


   With 100MB segment size:
   
   Before this PR:
   ```
   422206 records sent, 84441.2 records/sec (82.46 MB/sec), 333.0 ms avg latency, 997.0 ms max latency.
   1016955 records sent, 203391.0 records/sec (198.62 MB/sec), 152.7 ms avg latency, 230.0 ms max latency.
   986760 records sent, 197352.0 records/sec (192.73 MB/sec), 155.5 ms avg latency, 293.0 ms max latency.
   1025070 records sent, 205014.0 records/sec (200.21 MB/sec), 150.0 ms avg latency, 231.0 ms max latency.
   1034265 records sent, 206853.0 records/sec (202.00 MB/sec), 148.5 ms avg latency, 212.0 ms max latency.
   1025280 records sent, 205056.0 records/sec (200.25 MB/sec), 149.2 ms avg latency, 222.0 ms max latency.
   1033485 records sent, 206697.0 records/sec (201.85 MB/sec), 148.6 ms avg latency, 212.0 ms max latency.
   1036230 records sent, 207246.0 records/sec (202.39 MB/sec), 148.2 ms avg latency, 220.0 ms max latency.
   1034385 records sent, 206877.0 records/sec (202.03 MB/sec), 148.4 ms avg latency, 216.0 ms max latency.
   1013655 records sent, 201401.7 records/sec (196.68 MB/sec), 151.5 ms avg latency, 247.0 ms max latency.
   1035300 records sent, 206481.9 records/sec (201.64 MB/sec), 149.1 ms avg latency, 213.0 ms max latency.
   1035585 records sent, 207117.0 records/sec (202.26 MB/sec), 148.4 ms avg latency, 217.0 ms max latency.
   1035015 records sent, 205197.3 records/sec (200.39 MB/sec), 149.4 ms avg latency, 231.0 ms max latency.
   ```
   
   
   After this PR:
   ```
   363796 records sent, 72759.2 records/sec (71.05 MB/sec), 389.1 ms avg latency, 1005.0 ms max latency.
   992910 records sent, 198582.0 records/sec (193.93 MB/sec), 154.5 ms avg latency, 281.0 ms max latency.
   989655 records sent, 197931.0 records/sec (193.29 MB/sec), 156.4 ms avg latency, 250.0 ms max latency.
   1026900 records sent, 205380.0 records/sec (200.57 MB/sec), 149.6 ms avg latency, 217.0 ms max latency.
   1033515 records sent, 206703.0 records/sec (201.86 MB/sec), 148.5 ms avg latency, 205.0 ms max latency.
   1034775 records sent, 206955.0 records/sec (202.10 MB/sec), 148.5 ms avg latency, 201.0 ms max latency.
   1035420 records sent, 207084.0 records/sec (202.23 MB/sec), 148.3 ms avg latency, 210.0 ms max latency.
   1013130 records sent, 202626.0 records/sec (197.88 MB/sec), 151.6 ms avg latency, 216.0 ms max latency.
   1010295 records sent, 202059.0 records/sec (197.32 MB/sec), 150.9 ms avg latency, 215.0 ms max latency.
   1022640 records sent, 204528.0 records/sec (199.73 MB/sec), 151.2 ms avg latency, 219.0 ms max latency.
   1015950 records sent, 203190.0 records/sec (198.43 MB/sec), 151.2 ms avg latency, 232.0 ms max latency.
   1033725 records sent, 206745.0 records/sec (201.90 MB/sec), 148.5 ms avg latency, 208.0 ms max latency.
   1024905 records sent, 204981.0 records/sec (200.18 MB/sec), 149.9 ms avg latency, 213.0 ms max latency.
   1035720 records sent, 207144.0 records/sec (202.29 MB/sec), 148.3 ms avg latency, 203.0 ms max latency.
   998625 records sent, 199725.0 records/sec (195.04 MB/sec), 153.8 ms avg latency, 214.0 ms max latency.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-807903146


   Thanks for the PR. Did we check the performance impact of this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812293654


   Ran `bin/kafka-producer-perf-test.sh` with default settings and 1KB record size.
   
   The result before applying this PR:
   ```
   1205625 records sent, 241125.0 records/sec (235.47 MB/sec), 127.4 ms avg latency, 204.0 ms max latency.
   1177965 records sent, 235593.0 records/sec (230.07 MB/sec), 130.2 ms avg latency, 202.0 ms max latency.
   1198860 records sent, 239772.0 records/sec (234.15 MB/sec), 128.2 ms avg latency, 197.0 ms max latency.
   1182420 records sent, 236484.0 records/sec (230.94 MB/sec), 129.8 ms avg latency, 201.0 ms max latency.
   1172340 records sent, 234468.0 records/sec (228.97 MB/sec), 131.0 ms avg latency, 204.0 ms max latency.
   1197570 records sent, 239514.0 records/sec (233.90 MB/sec), 128.3 ms avg latency, 203.0 ms max latency.
   1178820 records sent, 235764.0 records/sec (230.24 MB/sec), 130.2 ms avg latency, 225.0 ms max latency.
   1173870 records sent, 234774.0 records/sec (229.27 MB/sec), 130.8 ms avg latency, 201.0 ms max latency.
   1152990 records sent, 230598.0 records/sec (225.19 MB/sec), 133.1 ms avg latency, 212.0 ms max latency.
   ```
   
   The result after applying this PR:
   ```
   1147650 records sent, 229530.0 records/sec (224.15 MB/sec), 133.9 ms avg latency, 216.0 ms max latency.
   1184085 records sent, 236817.0 records/sec (231.27 MB/sec), 129.7 ms avg latency, 213.0 ms max latency.
   1213275 records sent, 242655.0 records/sec (236.97 MB/sec), 126.5 ms avg latency, 204.0 ms max latency.
   1176105 records sent, 235221.0 records/sec (229.71 MB/sec), 130.5 ms avg latency, 211.0 ms max latency.
   1143045 records sent, 228609.0 records/sec (223.25 MB/sec), 134.4 ms avg latency, 231.0 ms max latency.
   1113390 records sent, 222678.0 records/sec (217.46 MB/sec), 138.0 ms avg latency, 236.0 ms max latency.
   1133850 records sent, 226770.0 records/sec (221.46 MB/sec), 135.3 ms avg latency, 208.0 ms max latency.
   1063410 records sent, 212682.0 records/sec (207.70 MB/sec), 144.6 ms avg latency, 216.0 ms max latency.
   1128195 records sent, 225639.0 records/sec (220.35 MB/sec), 136.2 ms avg latency, 235.0 ms max latency.
   ```
   
   The performance decreases a little, but not a significant difference.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812709188


   @ccding : Thanks for the experimental results. It seems there is a 5-10% throughput drop with the new PR for 10MB segment. This may not be a big concern since it's an uncommon setting. It's interesting that the absolute throughput dropped significantly with 10MB segments compared with 1GB segment. 
   
   Could you redo the tests for 100MB segments? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812282452


   Fixed comments from @junrao 
   
   Also addressed the two problems we discussed offline:
   - flush the parent of new segments during its first flush: added the `needsFlushParentDir = needsRecovery || !fileAlreadyExists` check
   - flush the parent directory after flushing the log file and all index files
   
   Please take a look @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606040591



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
+  /* whether or not we need to flush the parent dir during flush */

Review comment:
       Changed it to `during the next flush`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606022097



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords,
                                val baseOffset: Long,
                                val indexIntervalBytes: Int,
                                val rollJitterMs: Long,
-                               val time: Time) extends Logging {
+                               val time: Time,
+                               val needsFlushParentDir: Boolean = false) extends Logging {

Review comment:
       Could we add the new param to the javadoc?

##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
+  /* whether or not we need to flush the parent dir during flush */

Review comment:
       during flush => during the first flush ?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -433,7 +440,8 @@ public static FileRecords open(File file,
     public static FileRecords open(File file,
                                    boolean fileAlreadyExists,
                                    int initFileSize,
-                                   boolean preallocate) throws IOException {
+                                   boolean preallocate,
+                                   boolean needsRecovery) throws IOException {

Review comment:
       This change seems unneeded?

##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -657,17 +668,19 @@ class LogSegment private[log] (val log: FileRecords,
 object LogSegment {
 
   def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
-           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
+           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
+           needsRecovery: Boolean = true): LogSegment = {

Review comment:
       It seems that needsRecovery should default to false?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605901483



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -249,6 +266,7 @@ public void renameTo(File f) throws IOException {
         } finally {
             this.file = f;
         }
+        needFlushParentDir.set(true);

Review comment:
       Fixed both.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812744839


   > It's interesting that the absolute throughput dropped significantly with 10MB segments compared with 1GB segment.
   
   I am thinking it is the cost of the extra flush. We have one extra flush per segment, which is 1 extra flush per 10,000 records for 10MB segments and 1KB records. If it were 1GB segments, there is 1 extra flush per 1,000,000 records: 1/100 of the amortized extra cost.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605293308



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {

Review comment:
       Ideally, we want to flush the parent dir first before setting needFlush to false.

##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -490,11 +490,14 @@ class LogSegment private[log] (val log: FileRecords,
    * Change the suffix for the index and log files for this log segment
    * IOException from this method should be handled by the caller
    */
-  def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
+  def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = {

Review comment:
       Hmm, we need to pass needFlushParentDir to each of log.renameTo and index.renameTo to disable flushing, right?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +442,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable);

Review comment:
       This is ok but not the most accurate. We only need to set the flush flag to true if it's mutable and log recovery is needed.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {
+            Utils.flushParentDir(file.toPath());
+        }
+    }
+
+    /**
+     * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
+     */
+    public void flushParentDir() throws IOException {
+        needFlushParentDir.set(false);

Review comment:
       Ideally, we want to flush the parent dir first before setting needFlush to false.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -848,6 +849,7 @@ class LogManager(logDirs: Seq[File],
       val dir = new File(logDirPath, logDirName)
       try {
         Files.createDirectories(dir.toPath)
+        Utils.flushParentDir(dir.toPath)

Review comment:
       I am wondering if we should flush the parent dir when we delete a log too. This is not strictly required for every delete. So one option is to flush every parent dir when closing the LogManager.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding edited a comment on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding edited a comment on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812744839


   > It's interesting that the absolute throughput dropped significantly with 10MB segments compared with 1GB segment.
   
   I am thinking it is the cost of the extra flush. We have one extra flush per segment, which is 1 extra flush per 10,000 records for 10MB segments and 1KB records. If it were 1GB segments, there is 1 extra flush per 1,000,000 records: 1/100 of the cost.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#issuecomment-812293654


   Ran `bin/kafka-producer-perf-test.sh` with default settings and 1KB record size.
   
   The result before applying this PR:
   ```
   1205625 records sent, 241125.0 records/sec (235.47 MB/sec), 127.4 ms avg latency, 204.0 ms max latency.
   1177965 records sent, 235593.0 records/sec (230.07 MB/sec), 130.2 ms avg latency, 202.0 ms max latency.
   1198860 records sent, 239772.0 records/sec (234.15 MB/sec), 128.2 ms avg latency, 197.0 ms max latency.
   1182420 records sent, 236484.0 records/sec (230.94 MB/sec), 129.8 ms avg latency, 201.0 ms max latency.
   1172340 records sent, 234468.0 records/sec (228.97 MB/sec), 131.0 ms avg latency, 204.0 ms max latency.
   1197570 records sent, 239514.0 records/sec (233.90 MB/sec), 128.3 ms avg latency, 203.0 ms max latency.
   1178820 records sent, 235764.0 records/sec (230.24 MB/sec), 130.2 ms avg latency, 225.0 ms max latency.
   1173870 records sent, 234774.0 records/sec (229.27 MB/sec), 130.8 ms avg latency, 201.0 ms max latency.
   1152990 records sent, 230598.0 records/sec (225.19 MB/sec), 133.1 ms avg latency, 212.0 ms max latency.
   ```
   
   The result after applying this PR:
   ```
   1147650 records sent, 229530.0 records/sec (224.15 MB/sec), 133.9 ms avg latency, 216.0 ms max latency.
   1184085 records sent, 236817.0 records/sec (231.27 MB/sec), 129.7 ms avg latency, 213.0 ms max latency.
   1213275 records sent, 242655.0 records/sec (236.97 MB/sec), 126.5 ms avg latency, 204.0 ms max latency.
   1176105 records sent, 235221.0 records/sec (229.71 MB/sec), 130.5 ms avg latency, 211.0 ms max latency.
   1143045 records sent, 228609.0 records/sec (223.25 MB/sec), 134.4 ms avg latency, 231.0 ms max latency.
   1113390 records sent, 222678.0 records/sec (217.46 MB/sec), 138.0 ms avg latency, 236.0 ms max latency.
   1133850 records sent, 226770.0 records/sec (221.46 MB/sec), 135.3 ms avg latency, 208.0 ms max latency.
   1063410 records sent, 212682.0 records/sec (207.70 MB/sec), 144.6 ms avg latency, 216.0 ms max latency.
   1128195 records sent, 225639.0 records/sec (220.35 MB/sec), 136.2 ms avg latency, 235.0 ms max latency.
   ```
   
   No significant difference.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605901016



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +445,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable && !fileAlreadyExists);

Review comment:
       fixed with passing in the `hadCleanShutdown` flag.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #10405:
URL: https://github.com/apache/kafka/pull/10405


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605901984



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {

Review comment:
       Per our offline discussion, we leave it unchanged. If the flush causes an IOException, the partition will go offline and doesn't have further chance to call flush again.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -195,6 +199,17 @@ public int append(MemoryRecords records) throws IOException {
      */
     public void flush() throws IOException {
         channel.force(true);
+        if (needFlushParentDir.getAndSet(false)) {

Review comment:
       Per our offline discussion, we leave it unchanged. If the flush causes an IOException, the partition will go offline and doesn't have further chances to call flush again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r606040360



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords,
                                val baseOffset: Long,
                                val indexIntervalBytes: Int,
                                val rollJitterMs: Long,
-                               val time: Time) extends Logging {
+                               val time: Time,
+                               val needsFlushParentDir: Boolean = false) extends Logging {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ccding commented on a change in pull request #10405: KAFKA-3968: fsync the parent directory of a segment file when the file is created

Posted by GitBox <gi...@apache.org>.
ccding commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r605902765



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -490,11 +490,14 @@ class LogSegment private[log] (val log: FileRecords,
    * Change the suffix for the index and log files for this log segment
    * IOException from this method should be handled by the caller
    */
-  def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
+  def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = {

Review comment:
       set the flag to false for rename.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org