You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:19 UTC

[22/37] git commit: KAFKA-1539 Fsync offset checkpoint file after writing.

KAFKA-1539 Fsync offset checkpoint file after writing.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e4b0841
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e4b0841
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e4b0841

Branch: refs/heads/transactional_messaging
Commit: 1e4b0841b37e9e6526d7a7a7c643b1369d9f03c5
Parents: 592678e
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Jul 21 10:22:50 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Mon Jul 21 10:22:50 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/OffsetCheckpoint.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1e4b0841/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 7af2f43..8c5b054 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -34,7 +34,8 @@ class OffsetCheckpoint(val file: File) extends Logging {
       // write to temp file and then swap with the existing file
       val temp = new File(file.getAbsolutePath + ".tmp")
 
-      val writer = new BufferedWriter(new FileWriter(temp))
+      val fileOutputStream = new FileOutputStream(temp)
+      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
       try {
         // write the current version
         writer.write(0.toString)
@@ -50,8 +51,9 @@ class OffsetCheckpoint(val file: File) extends Logging {
           writer.newLine()
         }
       
-        // flush and overwrite old file
+        // flush the buffer and then fsync the underlying file
         writer.flush()
+        fileOutputStream.getFD().sync()
       } finally {
         writer.close()
       }