You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/15 16:10:53 UTC

[GitHub] [kafka] satishd opened a new pull request #11060: MAJOR Refactored the existing CheckpointFile in core module and moved to server common module and introduced it as SnapshotFile.

satishd opened a new pull request #11060:
URL: https://github.com/apache/kafka/pull/11060


   MAJOR Refactored the existing CheckpointFile in core module and moved to server common module and introduced it as SnapshotFile.
    - Refactored CheckpointFile to server/common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile.
    - This is also used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation.
    - Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile.
    - Added tests for the newly introduced CommittedOffsetsFile class.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#issuecomment-907635550


   Thanks @junrao for pointing that out. Earlier test failure is fixed with the latest commit and we do not see that failure now. The latest run failures are not related to the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #11060:
URL: https://github.com/apache/kafka/pull/11060


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#discussion_r671517216



##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -16,127 +16,41 @@
   */
 package kafka.server.checkpoints
 
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
-
 import kafka.server.LogDirFailureChannel
-import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.{Seq, mutable}
+import org.apache.kafka.server.common.SnapshotFile
+import org.apache.kafka.server.common.SnapshotFile.EntryFormatter
 
-trait CheckpointFileFormatter[T]{
-  def toLine(entry: T): String
-
-  def fromLine(line: String): Option[T]
-}
-
-class CheckpointReadBuffer[T](location: String,
-                              reader: BufferedReader,
-                              version: Int,
-                              formatter: CheckpointFileFormatter[T]) extends Logging {
-  def read(): Seq[T] = {
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-    var line: String = null
-    try {
-      line = reader.readLine()
-      if (line == null)
-        return Seq.empty
-      line.toInt match {
-        case fileVersion if fileVersion == version =>
-          line = reader.readLine()
-          if (line == null)
-            return Seq.empty
-          val expectedSize = line.toInt
-          val entries = mutable.Buffer[T]()
-          line = reader.readLine()
-          while (line != null) {
-            val entry = formatter.fromLine(line)
-            entry match {
-              case Some(e) =>
-                entries += e
-                line = reader.readLine()
-              case _ => throw malformedLineException(line)
-            }
-          }
-          if (entries.size != expectedSize)
-            throw new IOException(s"Expected $expectedSize entries in checkpoint file ($location), but found only ${entries.size}")
-          entries
-        case _ =>
-          throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)
-      }
-    } catch {
-      case _: NumberFormatException => throw malformedLineException(line)
-    }
-  }
-}
+import java.io._
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
 
 class CheckpointFile[T](val file: File,
                         version: Int,
-                        formatter: CheckpointFileFormatter[T],
+                        formatter: EntryFormatter[T],
                         logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
+                        logDir: String) {

Review comment:
       Do we need CheckpointFile? Could we just extend SnapshotFile to take an IOExceptionHandler and logDir?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#issuecomment-881843364


   Thanks @junrao  for the comment, replied inline. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #11060:
URL: https://github.com/apache/kafka/pull/11060


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#discussion_r696789655



##########
File path: server-common/src/main/java/org/apache/kafka/server/common/SnapshotFile.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class represents a utility to capture a snapshot in a file. It writes down to the file in the below format.
+ *
+ * ========= File beginning =========
+ * version: int
+ * entries-count: int
+ * entry-as-string-on-each-line
+ * ========= File end ===============
+ *
+ * Each entry is represented as a string on each line in the snapshot file. {@link EntryFormatter} is used
+ * to convert the entry into a string and vice versa.
+ *
+ * @param <T> entry type.
+ */
+public class SnapshotFile<T> {

Review comment:
       Since KRaft introduced Snapshot for metadata, perhaps we could call this CheckpointFile and rename CheckpointFile to sth like CheckpointFileWithFailureHandler.

##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -16,127 +16,41 @@
   */
 package kafka.server.checkpoints
 
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
-
 import kafka.server.LogDirFailureChannel
-import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.{Seq, mutable}
+import org.apache.kafka.server.common.SnapshotFile
+import org.apache.kafka.server.common.SnapshotFile.EntryFormatter
 
-trait CheckpointFileFormatter[T]{
-  def toLine(entry: T): String
-
-  def fromLine(line: String): Option[T]
-}
-
-class CheckpointReadBuffer[T](location: String,
-                              reader: BufferedReader,
-                              version: Int,
-                              formatter: CheckpointFileFormatter[T]) extends Logging {
-  def read(): Seq[T] = {
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-    var line: String = null
-    try {
-      line = reader.readLine()
-      if (line == null)
-        return Seq.empty
-      line.toInt match {
-        case fileVersion if fileVersion == version =>
-          line = reader.readLine()
-          if (line == null)
-            return Seq.empty
-          val expectedSize = line.toInt
-          val entries = mutable.Buffer[T]()
-          line = reader.readLine()
-          while (line != null) {
-            val entry = formatter.fromLine(line)
-            entry match {
-              case Some(e) =>
-                entries += e
-                line = reader.readLine()
-              case _ => throw malformedLineException(line)
-            }
-          }
-          if (entries.size != expectedSize)
-            throw new IOException(s"Expected $expectedSize entries in checkpoint file ($location), but found only ${entries.size}")
-          entries
-        case _ =>
-          throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)
-      }
-    } catch {
-      case _: NumberFormatException => throw malformedLineException(line)
-    }
-  }
-}
+import java.io._
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
 
 class CheckpointFile[T](val file: File,
                         version: Int,
-                        formatter: CheckpointFileFormatter[T],
+                        formatter: EntryFormatter[T],
                         logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
+                        logDir: String) {
+  private val snapshotFile = new SnapshotFile[T](file, version, formatter)
 
   def write(entries: Iterable[T]): Unit = {
-    lock synchronized {
       try {
-        // write to temp file and then swap with the existing file
-        val fileOutputStream = new FileOutputStream(tempPath.toFile)
-        val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
-        try {
-          writer.write(version.toString)
-          writer.newLine()
-
-          writer.write(entries.size.toString)
-          writer.newLine()
-
-          entries.foreach { entry =>
-            writer.write(formatter.toLine(entry))
-            writer.newLine()
-          }
-
-          writer.flush()
-          fileOutputStream.getFD().sync()
-        } finally {
-          writer.close()
-        }
-
-        Utils.atomicMoveWithFallback(tempPath, path)
+        snapshotFile.write(entries.toSeq.asJava);

Review comment:
       No need for ;




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on a change in pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on a change in pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#discussion_r671627863



##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -16,127 +16,41 @@
   */
 package kafka.server.checkpoints
 
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
-
 import kafka.server.LogDirFailureChannel
-import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.{Seq, mutable}
+import org.apache.kafka.server.common.SnapshotFile
+import org.apache.kafka.server.common.SnapshotFile.EntryFormatter
 
-trait CheckpointFileFormatter[T]{
-  def toLine(entry: T): String
-
-  def fromLine(line: String): Option[T]
-}
-
-class CheckpointReadBuffer[T](location: String,
-                              reader: BufferedReader,
-                              version: Int,
-                              formatter: CheckpointFileFormatter[T]) extends Logging {
-  def read(): Seq[T] = {
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-    var line: String = null
-    try {
-      line = reader.readLine()
-      if (line == null)
-        return Seq.empty
-      line.toInt match {
-        case fileVersion if fileVersion == version =>
-          line = reader.readLine()
-          if (line == null)
-            return Seq.empty
-          val expectedSize = line.toInt
-          val entries = mutable.Buffer[T]()
-          line = reader.readLine()
-          while (line != null) {
-            val entry = formatter.fromLine(line)
-            entry match {
-              case Some(e) =>
-                entries += e
-                line = reader.readLine()
-              case _ => throw malformedLineException(line)
-            }
-          }
-          if (entries.size != expectedSize)
-            throw new IOException(s"Expected $expectedSize entries in checkpoint file ($location), but found only ${entries.size}")
-          entries
-        case _ =>
-          throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)
-      }
-    } catch {
-      case _: NumberFormatException => throw malformedLineException(line)
-    }
-  }
-}
+import java.io._
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
 
 class CheckpointFile[T](val file: File,
                         version: Int,
-                        formatter: CheckpointFileFormatter[T],
+                        formatter: EntryFormatter[T],
                         logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
+                        logDir: String) {

Review comment:
       I started with that while refactoring the existing class. I implemented  `IOExceptionHandler` class which takes `logDir` and `logDirFailureChannel`, and reuse it in both `LeaderepochCheckpointFile` and `OffsetsCheckpointFile`. I preferred to keep `SnapshotFile` simple and implementors can have their own logic in extending write and read methods. This allows more custom logic for read/write but not only `IOException` handling.  
   In both cases, I had to create a new class, and I preferred to extend the `SnapshotFile` class.  I do not have strong opinions on this, I am fine with either way. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#issuecomment-907318976


   Thanks @junrao for the comments. Addressed them with the latest commit. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on a change in pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on a change in pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#discussion_r671627863



##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -16,127 +16,41 @@
   */
 package kafka.server.checkpoints
 
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
-
 import kafka.server.LogDirFailureChannel
-import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.{Seq, mutable}
+import org.apache.kafka.server.common.SnapshotFile
+import org.apache.kafka.server.common.SnapshotFile.EntryFormatter
 
-trait CheckpointFileFormatter[T]{
-  def toLine(entry: T): String
-
-  def fromLine(line: String): Option[T]
-}
-
-class CheckpointReadBuffer[T](location: String,
-                              reader: BufferedReader,
-                              version: Int,
-                              formatter: CheckpointFileFormatter[T]) extends Logging {
-  def read(): Seq[T] = {
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-    var line: String = null
-    try {
-      line = reader.readLine()
-      if (line == null)
-        return Seq.empty
-      line.toInt match {
-        case fileVersion if fileVersion == version =>
-          line = reader.readLine()
-          if (line == null)
-            return Seq.empty
-          val expectedSize = line.toInt
-          val entries = mutable.Buffer[T]()
-          line = reader.readLine()
-          while (line != null) {
-            val entry = formatter.fromLine(line)
-            entry match {
-              case Some(e) =>
-                entries += e
-                line = reader.readLine()
-              case _ => throw malformedLineException(line)
-            }
-          }
-          if (entries.size != expectedSize)
-            throw new IOException(s"Expected $expectedSize entries in checkpoint file ($location), but found only ${entries.size}")
-          entries
-        case _ =>
-          throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)
-      }
-    } catch {
-      case _: NumberFormatException => throw malformedLineException(line)
-    }
-  }
-}
+import java.io._
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
 
 class CheckpointFile[T](val file: File,
                         version: Int,
-                        formatter: CheckpointFileFormatter[T],
+                        formatter: EntryFormatter[T],
                         logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
+                        logDir: String) {

Review comment:
       I started with that while refactoring the existing class. I implemented  `IOExceptionHandler` class which takes `logDir` and `logDirFailureChannel`, and reuse it in both `LeaderepochCheckpointFile` and `OffsetsCheckpointFile`. I preferred to keep `SnapshotFile` simple and implementors can have their own logic in extending write and read methods. This allows more custom logic for read/write but not only `IOException` handling.  
   In both cases, I had to create a new class, and I preferred to extend the `SnapshotFile` class.  
   
   Having said that, I do not have strong opinions on this, I am fine with either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #11060: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile.

Posted by GitBox <gi...@apache.org>.
satishd commented on pull request #11060:
URL: https://github.com/apache/kafka/pull/11060#issuecomment-881150117


   @junrao This is a minor refactoring. `SnapshotFile` can be reused by default RLMM for storing committed offsets for remote log metadata topic. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org