You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2023/11/01 16:40:53 UTC
(kafka) branch trunk updated: KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eca85029904 KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)
eca85029904 is described below
commit eca85029904612d35da00ff34e9118d96968bd4e
Author: Alok Thatikunta <at...@confluent.io>
AuthorDate: Wed Nov 1 22:10:45 2023 +0530
KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module (#14607)
This PR moves PartitionMetadataFile to the storage module.
Existing unit tests in UnifiedLogTest like testLogFlushesPartitionMetadataOnAppend should suffice.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 6 +-
.../scala/kafka/server/PartitionMetadataFile.scala | 166 ---------------------
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 4 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 1 +
.../internals/checkpoint/PartitionMetadata.java | 42 ++++++
.../checkpoint/PartitionMetadataFile.java | 141 +++++++++++++++++
.../checkpoint/PartitionMetadataReadBuffer.java | 85 +++++++++++
7 files changed, 274 insertions(+), 171 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e8c88ccd199..d1fff6783d8 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -20,7 +20,7 @@ package kafka.log
import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
-import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
+import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
@@ -38,7 +38,7 @@ import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
@@ -182,7 +182,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
if (!remoteLogEnabled()) {
error("Ignoring the call as the remote log storage is disabled")
- return;
+ return
}
maybeIncrementLogStartOffset(remoteLogStartOffset, LogStartOffsetIncrementReason.SegmentDeletion)
}
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
deleted file mode 100644
index ec4425de9e6..00000000000
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
-import java.util.regex.Pattern
-
-import kafka.utils.Logging
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.storage.internals.log.LogDirFailureChannel
-
-
-object PartitionMetadataFile {
- private val PartitionMetadataFilename = "partition.metadata"
- private val WhiteSpacesPattern = Pattern.compile(":\\s+")
- private val CurrentVersion = 0
-
- def newFile(dir: File): File = new File(dir, PartitionMetadataFilename)
-
- object PartitionMetadataFileFormatter {
- def toFile(data: PartitionMetadata): String = {
- s"version: ${data.version}\ntopic_id: ${data.topicId}"
- }
-
- }
-
- class PartitionMetadataReadBuffer[T](location: String,
- reader: BufferedReader) extends Logging {
- def read(): PartitionMetadata = {
- def malformedLineException(line: String) =
- new IOException(s"Malformed line in checkpoint file ($location): '$line'")
-
- var line: String = null
- var metadataTopicId: Uuid = null
- try {
- line = reader.readLine()
- WhiteSpacesPattern.split(line) match {
- case Array(_, version) =>
- if (version.toInt == CurrentVersion) {
- line = reader.readLine()
- WhiteSpacesPattern.split(line) match {
- case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId)
- case _ => throw malformedLineException(line)
- }
- if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
- throw new IOException(s"Invalid topic ID in partition metadata file ($location)")
- }
- new PartitionMetadata(CurrentVersion, metadataTopicId)
- } else {
- throw new IOException(s"Unrecognized version of partition metadata file ($location): " + version)
- }
- case _ => throw malformedLineException(line)
- }
- } catch {
- case _: NumberFormatException => throw malformedLineException(line)
- }
- }
- }
-
-}
-
-class PartitionMetadata(val version: Int, val topicId: Uuid)
-
-
-class PartitionMetadataFile(val file: File,
- logDirFailureChannel: LogDirFailureChannel) extends Logging {
- import kafka.server.PartitionMetadataFile.{CurrentVersion, PartitionMetadataFileFormatter, PartitionMetadataReadBuffer}
-
- private val path = file.toPath.toAbsolutePath
- private val tempPath = Paths.get(path.toString + ".tmp")
- private val lock = new Object()
- private val logDir = file.getParentFile.getParent
- @volatile private var dirtyTopicIdOpt : Option[Uuid] = None
-
- /**
- * Records the topic ID that will be flushed to disk.
- */
- def record(topicId: Uuid): Unit = {
- // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
- dirtyTopicIdOpt.foreach { dirtyTopicId =>
- if (dirtyTopicId != topicId)
- throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
- s"but had already recorded $dirtyTopicId")
- }
- dirtyTopicIdOpt = Some(topicId)
- }
-
- def maybeFlush(): Unit = {
- // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
- dirtyTopicIdOpt.foreach { _ =>
- // We synchronize on the actual write to disk
- lock synchronized {
- dirtyTopicIdOpt.foreach { topicId =>
- try {
- // write to temp file and then swap with the existing file
- val fileOutputStream = new FileOutputStream(tempPath.toFile)
- val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
- try {
- writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
- writer.flush()
- fileOutputStream.getFD.sync()
- } finally {
- writer.close()
- }
-
- Utils.atomicMoveWithFallback(tempPath, path)
- } catch {
- case e: IOException =>
- val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
- logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
- throw new KafkaStorageException(msg, e)
- }
- dirtyTopicIdOpt = None
- }
- }
- }
- }
-
- def read(): PartitionMetadata = {
- lock synchronized {
- try {
- val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
- try {
- val partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath, reader)
- partitionBuffer.read()
- } finally {
- reader.close()
- }
- } catch {
- case e: IOException =>
- val msg = s"Error while reading partition metadata file ${file.getAbsolutePath}"
- logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
- throw new KafkaStorageException(msg, e)
- }
- }
- }
-
- def exists(): Boolean = {
- file.exists()
- }
-
- def delete(): Unit = {
- Files.delete(file.toPath)
- }
-
- override def toString: String = s"PartitionMetadataFile(path=$path)"
-}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 492ba87bc87..86bcf6c878b 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
-import kafka.server.{BrokerTopicStats, KafkaConfig, PartitionMetadataFile}
+import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
@@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0c41d667ad9..c6af66fbe2d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -71,6 +71,7 @@ import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.util.timer.MockTimer
+import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java
new file mode 100644
index 00000000000..1928780900d
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadata.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+public class PartitionMetadata {
+ private final int version;
+ private final Uuid topicId;
+
+ public PartitionMetadata(int version, Uuid topicId) {
+ this.version = version;
+ this.topicId = topicId;
+ }
+
+ public int version() {
+ return version;
+ }
+
+ public Uuid topicId() {
+ return topicId;
+ }
+
+ public String encode() {
+ return "version: " + version + "\ntopic_id: " + topicId;
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java
new file mode 100644
index 00000000000..eaee2decad0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFile.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
+public class PartitionMetadataFile {
+ private static final String PARTITION_METADATA_FILE_NAME = "partition.metadata";
+ static final int CURRENT_VERSION = 0;
+
+ public static File newFile(File dir) {
+ return new File(dir, PARTITION_METADATA_FILE_NAME);
+ }
+
+ private final File file;
+ private final LogDirFailureChannel logDirFailureChannel;
+
+ private final Object lock = new Object();
+ private volatile Optional<Uuid> dirtyTopicIdOpt = Optional.empty();
+
+ public PartitionMetadataFile(
+ final File file,
+ final LogDirFailureChannel logDirFailureChannel
+ ) {
+ this.file = file;
+ this.logDirFailureChannel = logDirFailureChannel;
+ }
+
+ /**
+ * Records the topic ID that will be flushed to disk.
+ */
+ public void record(Uuid topicId) {
+ // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
+ dirtyTopicIdOpt.ifPresent(dirtyTopicId -> {
+ if (dirtyTopicId != topicId) {
+ throw new InconsistentTopicIdException("Tried to record topic ID $topicId to file " +
+ "but had already recorded $dirtyTopicId");
+ }
+ });
+ dirtyTopicIdOpt = Optional.of(topicId);
+ }
+
+ public void maybeFlush() {
+ // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
+ if (dirtyTopicIdOpt.isPresent()) {
+ // We synchronize on the actual write to disk
+ synchronized (lock) {
+ dirtyTopicIdOpt.ifPresent(topicId -> {
+ try {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath().toFile());
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+ writer.write(new PartitionMetadata(CURRENT_VERSION, topicId).encode());
+ writer.flush();
+ fileOutputStream.getFD().sync();
+ }
+
+ Utils.atomicMoveWithFallback(tempPath(), path());
+ } catch (IOException e) {
+ String msg = "Error while writing partition metadata file " + file.getAbsolutePath();
+ logDirFailureChannel.maybeAddOfflineLogDir(logDir(), msg, e);
+ throw new KafkaStorageException(msg, e);
+ }
+ dirtyTopicIdOpt = Optional.empty();
+ });
+ }
+ }
+ }
+
+ public PartitionMetadata read() {
+ synchronized (lock) {
+ try {
+ try (BufferedReader reader = Files.newBufferedReader(path(), StandardCharsets.UTF_8)) {
+ PartitionMetadataReadBuffer partitionBuffer = new PartitionMetadataReadBuffer(file.getAbsolutePath(), reader);
+ return partitionBuffer.read();
+ }
+ } catch (IOException e) {
+ String msg = "Error while reading partition metadata file " + file.getAbsolutePath();
+ logDirFailureChannel.maybeAddOfflineLogDir(logDir(), msg, e);
+ throw new KafkaStorageException(msg, e);
+ }
+ }
+ }
+
+ public boolean exists() {
+ return file.exists();
+ }
+
+ public void delete() throws IOException {
+ Files.delete(file.toPath());
+ }
+
+ private Path path() {
+ return file.toPath().toAbsolutePath();
+ }
+
+ private Path tempPath() {
+ return Paths.get(path() + ".tmp");
+ }
+
+ private String logDir() {
+ return file.getParentFile().getParent();
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionMetadataFile(" +
+ "path=" + path() +
+ ')';
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java
new file mode 100644
index 00000000000..106df194111
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataReadBuffer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+ private static final Pattern WHITE_SPACES_PATTERN = Pattern.compile(":\\s+");
+
+ private final String location;
+ private final BufferedReader reader;
+
+ public PartitionMetadataReadBuffer(
+ String location,
+ BufferedReader reader
+ ) {
+ this.location = location;
+ this.reader = reader;
+ }
+
+ PartitionMetadata read() throws IOException {
+ String line = null;
+ Uuid metadataTopicId;
+
+ try {
+ line = reader.readLine();
+ String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+ if (versionArr.length == 2) {
+ int version = Integer.parseInt(versionArr[1]);
+ // To ensure downgrade compatibility, check if version is at least 0
+ if (version >= PartitionMetadataFile.CURRENT_VERSION) {
+ line = reader.readLine();
+ String[] topicIdArr = WHITE_SPACES_PATTERN.split(line);
+
+ if (topicIdArr.length == 2) {
+ metadataTopicId = Uuid.fromString(topicIdArr[1]);
+
+ if (metadataTopicId.equals(Uuid.ZERO_UUID)) {
+ throw new IOException("Invalid topic ID in partition metadata file (" + location + ")");
+ }
+
+ return new PartitionMetadata(version, metadataTopicId);
+ } else {
+ throw malformedLineException(line);
+ }
+ } else {
+ throw new IOException("Unrecognized version of partition metadata file + (" + location + "): " + version);
+ }
+ } else {
+ throw malformedLineException(line);
+ }
+
+ } catch (NumberFormatException e) {
+ throw malformedLineException(line, e);
+ }
+ }
+
+ private IOException malformedLineException(String line) {
+ return new IOException(String.format("Malformed line in checkpoint file [%s]: %s", location, line));
+ }
+
+ private IOException malformedLineException(String line, Exception e) {
+ return new IOException(String.format("Malformed line in checkpoint file [%s]: %s", location, line), e);
+ }
+}