You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2023/11/01 16:40:53 UTC

(kafka) branch trunk updated: KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eca85029904 KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)
eca85029904 is described below

commit eca85029904612d35da00ff34e9118d96968bd4e
Author: Alok Thatikunta <at...@confluent.io>
AuthorDate: Wed Nov 1 22:10:45 2023 +0530

    KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)
    
    This PR moves PartitionMetadataFile to the storage module.
    
    Existing unit tests in UnifiedLogTest like testLogFlushesPartitionMetadataOnAppend should suffice.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |   6 +-
 .../scala/kafka/server/PartitionMetadataFile.scala | 166 ---------------------
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   1 +
 .../internals/checkpoint/PartitionMetadata.java    |  42 ++++++
 .../checkpoint/PartitionMetadataFile.java          | 141 +++++++++++++++++
 .../checkpoint/PartitionMetadataReadBuffer.java    |  85 +++++++++++
 7 files changed, 274 insertions(+), 171 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e8c88ccd199..d1fff6783d8 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -20,7 +20,7 @@ package kafka.log
 import com.yammer.metrics.core.MetricName
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
@@ -38,7 +38,7 @@ import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
 
@@ -182,7 +182,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
     if (!remoteLogEnabled()) {
       error("Ignoring the call as the remote log storage is disabled")
-      return;
+      return
     }
     maybeIncrementLogStartOffset(remoteLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
   }
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
deleted file mode 100644
index ec4425de9e6..00000000000
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
-import java.util.regex.Pattern
-
-import kafka.utils.Logging
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.storage.internals.log.LogDirFailureChannel
-
-
-object PartitionMetadataFile {
-  private val PartitionMetadataFilename = "partition.metadata"
-  private val WhiteSpacesPattern = Pattern.compile(":\\s+")
-  private val CurrentVersion = 0
-
-  def newFile(dir: File): File = new File(dir, PartitionMetadataFilename)
-
-  object PartitionMetadataFileFormatter {
-    def toFile(data: PartitionMetadata): String = {
-      s"version: ${data.version}\ntopic_id: ${data.topicId}"
-    }
-
-  }
-
-  class PartitionMetadataReadBuffer[T](location: String,
-                                       reader: BufferedReader) extends Logging {
-    def read(): PartitionMetadata = {
-      def malformedLineException(line: String) =
-        new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
-      var line: String = null
-      var metadataTopicId: Uuid = null
-      try {
-        line = reader.readLine()
-        WhiteSpacesPattern.split(line) match {
-          case Array(_, version) =>
-            if (version.toInt == CurrentVersion) {
-              line = reader.readLine()
-              WhiteSpacesPattern.split(line) match {
-                case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId)
-                case _ => throw malformedLineException(line)
-              }
-              if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
-                throw new IOException(s"Invalid topic ID in partition metadata file ($location)")
-              }
-              new PartitionMetadata(CurrentVersion, metadataTopicId)
-            } else {
-              throw new IOException(s"Unrecognized version of partition metadata file ($location): " + version)
-            }
-          case _ => throw malformedLineException(line)
-        }
-      } catch {
-        case _: NumberFormatException => throw malformedLineException(line)
-      }
-    }
-  }
-
-}
-
-class PartitionMetadata(val version: Int, val topicId: Uuid)
-
-
-class PartitionMetadataFile(val file: File,
-                            logDirFailureChannel: LogDirFailureChannel) extends Logging {
-  import kafka.server.PartitionMetadataFile.{CurrentVersion, PartitionMetadataFileFormatter, PartitionMetadataReadBuffer}
-
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-  private val logDir = file.getParentFile.getParent
-  @volatile private var dirtyTopicIdOpt : Option[Uuid] = None
-
-  /**
-   * Records the topic ID that will be flushed to disk.
-   */
-  def record(topicId: Uuid): Unit = {
-    // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
-    dirtyTopicIdOpt.foreach { dirtyTopicId =>
-      if (dirtyTopicId != topicId)
-        throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
-          s"but had already recorded $dirtyTopicId")
-    }
-    dirtyTopicIdOpt = Some(topicId)
-  }
-
-  def maybeFlush(): Unit = {
-    // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
-    dirtyTopicIdOpt.foreach { _ =>
-      // We synchronize on the actual write to disk
-      lock synchronized {
-        dirtyTopicIdOpt.foreach { topicId =>
-          try {
-            // write to temp file and then swap with the existing file
-            val fileOutputStream = new FileOutputStream(tempPath.toFile)
-            val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
-            try {
-              writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
-              writer.flush()
-              fileOutputStream.getFD.sync()
-            } finally {
-              writer.close()
-            }
-
-            Utils.atomicMoveWithFallback(tempPath, path)
-          } catch {
-            case e: IOException =>
-              val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
-              logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-              throw new KafkaStorageException(msg, e)
-          }
-          dirtyTopicIdOpt = None
-        }
-      }
-    }
-  }
-
-  def read(): PartitionMetadata = {
-    lock synchronized {
-      try {
-        val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
-        try {
-          val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader)
-          partitionBuffer.read()
-        } finally {
-          reader.close()
-        }
-      } catch {
-        case e: IOException =>
-          val msg = s"Error while reading partition metadata file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-          throw new KafkaStorageException(msg, e)
-      }
-    }
-  }
-
-  def exists(): Boolean = {
-    file.exists()
-  }
-
-  def delete(): Unit = {
-    Files.delete(file.toPath)
-  }
-
-  override def toString: String = s"PartitionMetadataFile(path=$path)"
-}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 492ba87bc87..86bcf6c878b 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
-import kafka.server.{BrokerTopicStats, KafkaConfig, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, KafkaConfig}
 import kafka.utils._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
@@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0c41d667ad9..c6af66fbe2d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -71,6 +71,7 @@ import org.apache.kafka.common.config.AbstractConfig
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
 import org.apache.kafka.server.util.timer.MockTimer
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java
new file mode 100644
index 00000000000..1928780900d
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+    private final int version;
+    private final Uuid topicId;
+
+    public PartitionMetadata(int version, Uuid topicId) {
+        this.version = version;
+        this.topicId = topicId;
+    }
+
+    public int version() {
+        return version;
+    }
+
+    public Uuid topicId() {
+        return topicId;
+    }
+
+    public String encode() {
+        return "version: " + version + "\ntopic_id: " + topicId;
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java
new file mode 100644
index 00000000000..eaee2decad0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+    private static final String PARTITION_METADATA_FILE_NAME = "partition.metadata";
+    static final int CURRENT_VERSION = 0;
+
+    public static File newFile(File dir) {
+        return new File(dir, PARTITION_METADATA_FILE_NAME);
+    }
+
+    private final File file;
+    private final LogDirFailureChannel logDirFailureChannel;
+
+    private final Object lock = new Object();
+    private volatile Optional<Uuid> dirtyTopicIdOpt = Optional.empty();
+
+    public PartitionMetadataFile(
+        final File file,
+        final LogDirFailureChannel logDirFailureChannel
+    ) {
+        this.file = file;
+        this.logDirFailureChannel = logDirFailureChannel;
+    }
+
+    /**
+     * Records the topic ID that will be flushed to disk.
+     */
+    public void record(Uuid topicId) {
+        // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
+        dirtyTopicIdOpt.ifPresent(dirtyTopicId -> {
+            if (dirtyTopicId != topicId) {
+                throw new InconsistentTopicIdException("Tried to record topic ID $topicId to file " +
+                    "but had already recorded $dirtyTopicId");
+            }
+        });
+        dirtyTopicIdOpt = Optional.of(topicId);
+    }
+
+    public void maybeFlush() {
+        // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
+        if (dirtyTopicIdOpt.isPresent()) {
+            // We synchronize on the actual write to disk
+            synchronized (lock) {
+                dirtyTopicIdOpt.ifPresent(topicId -> {
+                    try {
+                        try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath().toFile());
+                             BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+                            writer.write(new PartitionMetadata(CURRENT_VERSION, topicId).encode());
+                            writer.flush();
+                            fileOutputStream.getFD().sync();
+                        }
+
+                        Utils.atomicMoveWithFallback(tempPath(), path());
+                    } catch (IOException e) {
+                        String msg = "Error while writing partition metadata file " + file.getAbsolutePath();
+                        logDirFailureChannel.maybeAddOfflineLogDir(logDir(), msg, e);
+                        throw new KafkaStorageException(msg, e);
+                    }
+                    dirtyTopicIdOpt = Optional.empty();
+                });
+            }
+        }
+    }
+
+    public PartitionMetadata read() {
+        synchronized (lock) {
+            try {
+                try (BufferedReader reader = Files.newBufferedReader(path(), StandardCharsets.UTF_8)) {
+                    PartitionMetadataReadBuffer partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath(), reader);
+                    return partitionBuffer.read();
+                }
+            } catch (IOException e) {
+                String msg = "Error while reading partition metadata file " + file.getAbsolutePath();
+                logDirFailureChannel.maybeAddOfflineLogDir(logDir(), msg, e);
+                throw new KafkaStorageException(msg, e);
+            }
+        }
+    }
+
+    public boolean exists() {
+        return file.exists();
+    }
+
+    public void delete() throws IOException {
+        Files.delete(file.toPath());
+    }
+
+    private Path path() {
+        return file.toPath().toAbsolutePath();
+    }
+
+    private Path tempPath() {
+        return Paths.get(path() + ".tmp");
+    }
+
+    private String logDir() {
+        return file.getParentFile().getParent();
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionMetadataFile(" +
+            "path=" + path() +
+            ')';
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java
new file mode 100644
index 00000000000..106df194111
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+    private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile(":\\s+");
+
+    private final String location;
+    private final BufferedReader reader;
+
+    public PartitionMetadataReadBuffer(
+        String location,
+        BufferedReader reader
+    ) {
+        this.location = location;
+        this.reader = reader;
+    }
+
+    PartitionMetadata read() throws IOException {
+        String line = null;
+        Uuid metadataTopicId;
+
+        try {
+            line = reader.readLine();
+            String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+            if (versionArr.length == 2) {
+                int version = Integer.parseInt(versionArr[1]);
+                // To ensure downgrade compatibility, check if version is at least 0
+                if (version >= PartitionMetadataFile.CURRENT_VERSION) {
+                    line = reader.readLine();
+                    String[] topicIdArr = WHITE_SPACES_PATTERN.split(line);
+
+                    if (topicIdArr.length == 2) {
+                        metadataTopicId = Uuid.fromString(topicIdArr[1]);
+
+                        if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+                            throw new IOException("Invalid topic ID in partition metadata file (" + location + ")");
+                        }
+
+                        return new PartitionMetadata(version, metadataTopicId);
+                    } else {
+                        throw malformedLineException(line);
+                    }
+                } else {
+                    throw new IOException("Unrecognized version of partition metadata file + (" + location + "): " + version);
+                }
+            } else {
+                throw malformedLineException(line);
+            }
+
+        } catch (NumberFormatException e) {
+            throw malformedLineException(line, e);
+        }
+    }
+
+    private IOException malformedLineException(String line) {
+        return new IOException(String.format("Malformed line in checkpoint file [%s]: %s", location, line));
+    }
+
+    private IOException malformedLineException(String line, Exception e) {
+        return new IOException(String.format("Malformed line in checkpoint file [%s]: %s", location, line), e);
+    }
+}