You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/08/30 15:49:27 UTC

[kafka] branch trunk updated: MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile. (#11060)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 923bc2e  MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile. (#11060)
923bc2e is described below

commit 923bc2e9f78c0d0f14e4adf6fc50bd9abdd416da
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Mon Aug 30 21:13:25 2021 +0530

    MINOR Refactored the existing CheckpointFile in core module, moved to server-common module and introduced it as SnapshotFile. (#11060)
    
    MINOR Refactored the existing CheckpointFile in core module, moved to server-common module.
    
    Refactored CheckpointFile to server-common module as a Java class and it is reused by LeaderCheckpointFile, OffsetCheckpointFile.
    This will be used by CommittedOffsetsFile which checkpoints remote log metadata partitions with respective offsets in the default RemoteLogMetadataManager implementation.
    Existing tests are available for LeaderCheckpointFile, OffsetCheckpointFile.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/server/checkpoints/CheckpointFile.scala  | 142 ---------------
 .../CheckpointFileWithFailureHandler.scala         |  56 ++++++
 .../checkpoints/LeaderEpochCheckpointFile.scala    |  19 +-
 .../server/checkpoints/OffsetCheckpointFile.scala  |  19 +-
 ...pochCheckpointFileWithFailureHandlerTest.scala} |   2 +-
 ...fsetCheckpointFileWithFailureHandlerTest.scala} |   4 +-
 .../apache/kafka/server/common/CheckpointFile.java | 195 +++++++++++++++++++++
 7 files changed, 274 insertions(+), 163 deletions(-)

diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
deleted file mode 100644
index 2ab9ab9..0000000
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server.checkpoints
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.nio.file.{FileAlreadyExistsException, Files, Paths}
-
-import kafka.server.LogDirFailureChannel
-import kafka.utils.Logging
-import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection.{Seq, mutable}
-
-trait CheckpointFileFormatter[T]{
-  def toLine(entry: T): String
-
-  def fromLine(line: String): Option[T]
-}
-
-class CheckpointReadBuffer[T](location: String,
-                              reader: BufferedReader,
-                              version: Int,
-                              formatter: CheckpointFileFormatter[T]) extends Logging {
-  def read(): Seq[T] = {
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-    var line: String = null
-    try {
-      line = reader.readLine()
-      if (line == null)
-        return Seq.empty
-      line.toInt match {
-        case fileVersion if fileVersion == version =>
-          line = reader.readLine()
-          if (line == null)
-            return Seq.empty
-          val expectedSize = line.toInt
-          val entries = mutable.Buffer[T]()
-          line = reader.readLine()
-          while (line != null) {
-            val entry = formatter.fromLine(line)
-            entry match {
-              case Some(e) =>
-                entries += e
-                line = reader.readLine()
-              case _ => throw malformedLineException(line)
-            }
-          }
-          if (entries.size != expectedSize)
-            throw new IOException(s"Expected $expectedSize entries in checkpoint file ($location), but found only ${entries.size}")
-          entries
-        case _ =>
-          throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)
-      }
-    } catch {
-      case _: NumberFormatException => throw malformedLineException(line)
-    }
-  }
-}
-
-class CheckpointFile[T](val file: File,
-                        version: Int,
-                        formatter: CheckpointFileFormatter[T],
-                        logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-
-  try Files.createFile(file.toPath) // create the file if it doesn't exist
-  catch { case _: FileAlreadyExistsException => }
-
-  def write(entries: Iterable[T]): Unit = {
-    lock synchronized {
-      try {
-        // write to temp file and then swap with the existing file
-        val fileOutputStream = new FileOutputStream(tempPath.toFile)
-        val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
-        try {
-          writer.write(version.toString)
-          writer.newLine()
-
-          writer.write(entries.size.toString)
-          writer.newLine()
-
-          entries.foreach { entry =>
-            writer.write(formatter.toLine(entry))
-            writer.newLine()
-          }
-
-          writer.flush()
-          fileOutputStream.getFD().sync()
-        } finally {
-          writer.close()
-        }
-
-        Utils.atomicMoveWithFallback(tempPath, path)
-      } catch {
-        case e: IOException =>
-          val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-          throw new KafkaStorageException(msg, e)
-      }
-    }
-  }
-
-  def read(): Seq[T] = {
-    lock synchronized {
-      try {
-        val reader = Files.newBufferedReader(path)
-        try {
-          val checkpointBuffer = new CheckpointReadBuffer[T](file.getAbsolutePath, reader, version, formatter)
-          checkpointBuffer.read()
-        } finally {
-          reader.close()
-        }
-      } catch {
-        case e: IOException =>
-          val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-          throw new KafkaStorageException(msg, e)
-      }
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala
new file mode 100644
index 0000000..7021c67
--- /dev/null
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFileWithFailureHandler.scala
@@ -0,0 +1,56 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server.checkpoints
+
+import kafka.server.LogDirFailureChannel
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.server.common.CheckpointFile
+import CheckpointFile.EntryFormatter
+
+import java.io._
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class CheckpointFileWithFailureHandler[T](val file: File,
+                                          version: Int,
+                                          formatter: EntryFormatter[T],
+                                          logDirFailureChannel: LogDirFailureChannel,
+                                          logDir: String) {
+  private val checkpointFile = new CheckpointFile[T](file, version, formatter)
+
+  def write(entries: Iterable[T]): Unit = {
+      try {
+        checkpointFile.write(entries.toSeq.asJava)
+      } catch {
+        case e: IOException =>
+          val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
+      }
+  }
+
+  def read(): Seq[T] = {
+      try {
+        checkpointFile.read().asScala
+      } catch {
+        case e: IOException =>
+          val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
+          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          throw new KafkaStorageException(msg, e)
+      }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index c9a0f62..c772b82 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -16,12 +16,13 @@
  */
 package kafka.server.checkpoints
 
-import java.io._
-import java.util.regex.Pattern
-
 import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
+import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
 
+import java.io._
+import java.util.Optional
+import java.util.regex.Pattern
 import scala.collection._
 
 trait LeaderEpochCheckpoint {
@@ -36,15 +37,15 @@ object LeaderEpochCheckpointFile {
 
   def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
 
-  object Formatter extends CheckpointFileFormatter[EpochEntry] {
+  object Formatter extends EntryFormatter[EpochEntry] {
 
-    override def toLine(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
+    override def toString(entry: EpochEntry): String = s"${entry.epoch} ${entry.startOffset}"
 
-    override def fromLine(line: String): Option[EpochEntry] = {
+    override def fromString(line: String): Optional[EpochEntry] = {
       WhiteSpacesPattern.split(line) match {
         case Array(epoch, offset) =>
-          Some(EpochEntry(epoch.toInt, offset.toLong))
-        case _ => None
+          Optional.of(EpochEntry(epoch.toInt, offset.toLong))
+        case _ => Optional.empty()
       }
     }
 
@@ -65,7 +66,7 @@ object LeaderEpochCheckpointFile {
 class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
   import LeaderEpochCheckpointFile._
 
-  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
+  val checkpoint = new CheckpointFileWithFailureHandler[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
 
   def write(epochs: Iterable[EpochEntry]): Unit = checkpoint.write(epochs)
 
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 722f609..f7b83ea 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -16,29 +16,30 @@
   */
 package kafka.server.checkpoints
 
-import java.io._
-import java.util.regex.Pattern
-
 import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.common.CheckpointFile.EntryFormatter
 
+import java.io._
+import java.util.Optional
+import java.util.regex.Pattern
 import scala.collection._
 
 object OffsetCheckpointFile {
   private val WhiteSpacesPattern = Pattern.compile("\\s+")
   private[checkpoints] val CurrentVersion = 0
 
-  object Formatter extends CheckpointFileFormatter[(TopicPartition, Long)] {
-    override def toLine(entry: (TopicPartition, Long)): String = {
+  object Formatter extends EntryFormatter[(TopicPartition, Long)] {
+    override def toString(entry: (TopicPartition, Long)): String = {
       s"${entry._1.topic} ${entry._1.partition} ${entry._2}"
     }
 
-    override def fromLine(line: String): Option[(TopicPartition, Long)] = {
+    override def fromString(line: String): Optional[(TopicPartition, Long)] = {
       WhiteSpacesPattern.split(line) match {
         case Array(topic, partition, offset) =>
-          Some(new TopicPartition(topic, partition.toInt), offset.toLong)
-        case _ => None
+          Optional.of(new TopicPartition(topic, partition.toInt), offset.toLong)
+        case _ => Optional.empty()
       }
     }
   }
@@ -61,7 +62,7 @@ trait OffsetCheckpoint {
  *  -----checkpoint file end----------
  */
 class OffsetCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) {
-  val checkpoint = new CheckpointFile[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
+  val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, Long)](file, OffsetCheckpointFile.CurrentVersion,
     OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
 
   def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets)
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileWithFailureHandlerTest.scala
similarity index 96%
rename from core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
rename to core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileWithFailureHandlerTest.scala
index 136852c..5ac9202 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileWithFailureHandlerTest.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
-class LeaderEpochCheckpointFileTest extends Logging {
+class LeaderEpochCheckpointFileWithFailureHandlerTest extends Logging {
 
   @Test
   def shouldPersistAndOverwriteAndReloadFile(): Unit ={
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
similarity index 96%
rename from core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
rename to core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
index 738729a..4889c54 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileWithFailureHandlerTest.scala
@@ -26,7 +26,7 @@ import org.mockito.Mockito
 
 import scala.collection.Map
 
-class OffsetCheckpointFileTest extends Logging {
+class OffsetCheckpointFileWithFailureHandlerTest extends Logging {
 
   @Test
   def shouldPersistAndOverwriteAndReloadFile(): Unit = {
@@ -93,7 +93,7 @@ class OffsetCheckpointFileTest extends Logging {
   def shouldThrowIfVersionIsNotRecognised(): Unit = {
     val file = TestUtils.tempFile()
     val logDirFailureChannel = new LogDirFailureChannel(10)
-    val checkpointFile = new CheckpointFile(file, OffsetCheckpointFile.CurrentVersion + 1,
+    val checkpointFile = new CheckpointFileWithFailureHandler(file, OffsetCheckpointFile.CurrentVersion + 1,
       OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
     checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
     assertThrows(classOf[KafkaStorageException], () => new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read())
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
new file mode 100644
index 0000000..2f3ccca
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class represents a utility to capture a checkpoint in a file. It writes down to the file in the below format.
+ *
+ * ========= File beginning =========
+ * version: int
+ * entries-count: int
+ * entry-as-string-on-each-line
+ * ========= File end ===============
+ *
+ * Each entry is represented as a string on each line in the checkpoint file. {@link EntryFormatter} is used
+ * to convert the entry into a string and vice versa.
+ *
+ * @param <T> entry type.
+ */
+public class CheckpointFile<T> {
+
+    private final int version;
+    private final EntryFormatter<T> formatter;
+    private final Object lock = new Object();
+    private final Path absolutePath;
+    private final Path tempPath;
+
+    public CheckpointFile(File file,
+                          int version,
+                          EntryFormatter<T> formatter) throws IOException {
+        this.version = version;
+        this.formatter = formatter;
+        try {
+            // Create the file if it does not exist.
+            Files.createFile(file.toPath());
+        } catch (FileAlreadyExistsException ex) {
+            // Ignore if file already exists.
+        }
+        absolutePath = file.toPath().toAbsolutePath();
+        tempPath = Paths.get(absolutePath.toString() + ".tmp");
+    }
+
+    public void write(Collection<T> entries) throws IOException {
+        synchronized (lock) {
+            // write to temp file and then swap with the existing file
+            try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile());
+                 BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+                // Write the version
+                writer.write(Integer.toString(version));
+                writer.newLine();
+
+                // Write the entries count
+                writer.write(Integer.toString(entries.size()));
+                writer.newLine();
+
+                // Write each entry on a new line.
+                for (T entry : entries) {
+                    writer.write(formatter.toString(entry));
+                    writer.newLine();
+                }
+
+                writer.flush();
+                fileOutputStream.getFD().sync();
+            }
+
+            Utils.atomicMoveWithFallback(tempPath, absolutePath);
+        }
+    }
+
+    public List<T> read() throws IOException {
+        synchronized (lock) {
+            try (BufferedReader reader = Files.newBufferedReader(absolutePath)) {
+                CheckpointReadBuffer<T> checkpointBuffer = new CheckpointReadBuffer<>(absolutePath.toString(), reader, version, formatter);
+                return checkpointBuffer.read();
+            }
+        }
+    }
+
+    private static class CheckpointReadBuffer<T> {
+
+        private final String location;
+        private final BufferedReader reader;
+        private final int version;
+        private final EntryFormatter<T> formatter;
+
+        CheckpointReadBuffer(String location,
+                             BufferedReader reader,
+                             int version,
+                             EntryFormatter<T> formatter) {
+            this.location = location;
+            this.reader = reader;
+            this.version = version;
+            this.formatter = formatter;
+        }
+
+        List<T> read() throws IOException {
+            String line = reader.readLine();
+            if (line == null)
+                return Collections.emptyList();
+
+            int readVersion = toInt(line);
+            if (readVersion != version) {
+                throw new IOException();
+            }
+
+            line = reader.readLine();
+            if (line == null) {
+                return Collections.emptyList();
+            }
+            int expectedSize = toInt(line);
+            List<T> entries = new ArrayList<>(expectedSize);
+            line = reader.readLine();
+            while (line != null) {
+                Optional<T> maybeEntry = formatter.fromString(line);
+                if (!maybeEntry.isPresent()) {
+                    throw buildMalformedLineException(line);
+                }
+                entries.add(maybeEntry.get());
+                line = reader.readLine();
+            }
+
+            if (entries.size() != expectedSize) {
+                throw new IOException("Expected [" + expectedSize + "] entries in checkpoint file ["
+                                              + location + "], but found only [" + entries.size() + "]");
+            }
+
+            return entries;
+        }
+
+        private int toInt(String line) throws IOException {
+            try {
+                return Integer.parseInt(line);
+            } catch (NumberFormatException e) {
+                throw buildMalformedLineException(line);
+            }
+        }
+
+        private IOException buildMalformedLineException(String line) {
+            return new IOException(String.format("Malformed line in checkpoint file [%s]: %s", location, line));
+        }
+    }
+
+    /**
+     * This is used to convert the given entry of type {@code T} into a string and vice versa.
+     *
+     * @param <T> entry type
+     */
+    public interface EntryFormatter<T> {
+
+        /**
+         * @param entry entry to be converted into string.
+         * @return String representation of the given entry.
+         */
+        String toString(T entry);
+
+        /**
+         * @param value string representation of an entry.
+         * @return entry converted from the given string representation if possible. {@link Optional#empty()} represents
+         * that the given string representation could not be converted into an entry.
+         */
+        Optional<T> fromString(String value);
+    }
+}