You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/31 07:27:49 UTC

[kafka] branch 2.8 updated: KAFKA-13092: Perf regression in LISR requests (#11073)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new fb47f6a  KAFKA-13092: Perf regression in LISR requests (#11073)
fb47f6a is described below

commit fb47f6a9b7da1a3b092b081b2ed20c86df29a849
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Tue Aug 31 00:26:02 2021 -0700

    KAFKA-13092: Perf regression in LISR requests (#11073)
    
    After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files.
    
    We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases.
    
    (cherry-picked from 584213ed20d679b11206b67c5a65035347632f07)
    
    Reviewers:  Lucas Bradstreet <lu...@confluent.io>, Jun Rao <ju...@gmail.com>, David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/log/Log.scala            |  37 +++-
 .../scala/kafka/server/PartitionMetadataFile.scala |  61 ++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  55 ++++-
 .../kafka/jmh/server/PartitionCreationBench.java   | 237 +++++++++++++++++++++
 4 files changed, 360 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0760816..debd257 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -577,14 +577,6 @@ class Log(@volatile private var _dir: File,
     partitionMetadataFile = new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)
   }
 
-  /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
-  def assignTopicId(topicId: Uuid): Unit = {
-    if (keepPartitionMetadataFile) {
-      partitionMetadataFile.write(topicId)
-      this.topicId = topicId
-    }
-  }
-
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
     val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
 
@@ -609,6 +601,29 @@ class Log(@volatile private var _dir: File,
     }
   }
 
+  private def maybeFlushMetadataFile(): Unit = {
+    partitionMetadataFile.maybeFlush()
+  }
+
+  /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
+  def assignTopicId(topicId: Uuid): Unit = {
+    if (!this.topicId.equals(Uuid.ZERO_UUID)) {
+      if (!this.topicId.equals(topicId)) {
+        // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower
+        throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
+          s"but log already contained topic ID ${this.topicId}")
+      }
+    }
+
+    if (keepPartitionMetadataFile) {
+      this.topicId = topicId
+      if (!partitionMetadataFile.exists()) {
+        partitionMetadataFile.record(topicId)
+        scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
+      }
+    }
+  }
+
   /**
    * Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped
    * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than
@@ -1056,6 +1071,7 @@ class Log(@volatile private var _dir: File,
   def close(): Unit = {
     debug("Closing log")
     lock synchronized {
+      maybeFlushMetadataFile()
       checkIfMemoryMappedBufferClosed()
       producerExpireCheck.cancel(true)
       maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
@@ -1076,6 +1092,8 @@ class Log(@volatile private var _dir: File,
   def renameDir(name: String): Unit = {
     lock synchronized {
       maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
+        // Flush partitionMetadata file before initializing again
+        maybeFlushMetadataFile()
         val renamedDir = new File(dir.getParent, name)
         Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
         if (renamedDir != dir) {
@@ -1160,6 +1178,9 @@ class Log(@volatile private var _dir: File,
                      validateAndAssignOffsets: Boolean,
                      leaderEpoch: Int,
                      ignoreRecordSize: Boolean): LogAppendInfo = {
+    // We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
+    // This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
+    maybeFlushMetadataFile()
 
     val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
 
diff --git a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
index 25b1ba6..1ccbcc1 100644
--- a/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
+++ b/core/src/main/scala/kafka/server/PartitionMetadataFile.scala
@@ -24,7 +24,7 @@ import java.util.regex.Pattern
 
 import kafka.utils.Logging
 import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException}
 import org.apache.kafka.common.utils.Utils
 
 
@@ -90,27 +90,48 @@ class PartitionMetadataFile(val file: File,
   private val tempPath = Paths.get(path.toString + ".tmp")
   private val lock = new Object()
   private val logDir = file.getParentFile.getParent
+  @volatile private var dirtyTopicIdOpt : Option[Uuid] = None
+
+  /**
+   * Records the topic ID that will be flushed to disk.
+   */
+  def record(topicId: Uuid): Unit = {
+    // Topic IDs should not differ, but we defensively check here to fail earlier in the case that the IDs somehow differ.
+    dirtyTopicIdOpt.foreach { dirtyTopicId =>
+      if (dirtyTopicId != topicId)
+        throw new InconsistentTopicIdException(s"Tried to record topic ID $topicId to file " +
+          s"but had already recorded $dirtyTopicId")
+    }
+    dirtyTopicIdOpt = Some(topicId)
+  }
 
-  def write(topicId: Uuid): Unit = {
-    lock synchronized {
-      try {
-        // write to temp file and then swap with the existing file
-        val fileOutputStream = new FileOutputStream(tempPath.toFile)
-        val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
-        try {
-          writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion,topicId)))
-          writer.flush()
-          fileOutputStream.getFD().sync()
-        } finally {
-          writer.close()
-        }
+  def maybeFlush(): Unit = {
+    // We check dirtyTopicId first to avoid having to take the lock unnecessarily in the frequently called log append path
+    dirtyTopicIdOpt.foreach { _ =>
+      // We synchronize on the actual write to disk
+      lock synchronized {
+        dirtyTopicIdOpt.foreach { topicId =>
+          try {
+            // write to temp file and then swap with the existing file
+            val fileOutputStream = new FileOutputStream(tempPath.toFile)
+            val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
+            try {
+              writer.write(PartitionMetadataFileFormatter.toFile(new PartitionMetadata(CurrentVersion, topicId)))
+              writer.flush()
+              fileOutputStream.getFD().sync()
+            } finally {
+              writer.close()
+            }
 
-        Utils.atomicMoveWithFallback(tempPath, path)
-      } catch {
-        case e: IOException =>
-          val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
-          throw new KafkaStorageException(msg, e)
+            Utils.atomicMoveWithFallback(tempPath, path)
+          } catch {
+            case e: IOException =>
+              val msg = s"Error while writing to partition metadata file ${file.getAbsolutePath}"
+              logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+              throw new KafkaStorageException(msg, e)
+          }
+          dirtyTopicIdOpt = None
+        }
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 947bcb7..b294713 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2547,7 +2547,7 @@ class LogTest {
     var log = createLog(logDir, logConfig)
 
     val topicId = Uuid.randomUuid()
-    log.partitionMetadataFile.write(topicId)
+    log.assignTopicId(topicId)
     log.close()
 
     // test recovery case
@@ -2556,6 +2556,37 @@ class LogTest {
     log.close()
   }
 
+  def testLogFlushesPartitionMetadataOnAppend(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    val log = createLog(logDir, logConfig)
+    val record = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("simpleValue".getBytes))
+
+    val topicId = Uuid.randomUuid()
+    log.partitionMetadataFile.record(topicId)
+
+    // Should trigger a synchronous flush
+    log.appendAsLeader(record, leaderEpoch = 0)
+    assertTrue(log.partitionMetadataFile.exists())
+    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+  }
+
+  @Test
+  def testLogFlushesPartitionMetadataOnClose(): Unit = {
+    val logConfig = LogTest.createLogConfig()
+    var log = createLog(logDir, logConfig)
+
+    val topicId = Uuid.randomUuid()
+    log.partitionMetadataFile.record(topicId)
+
+    // Should trigger a synchronous flush
+    log.close()
+
+    // We open the log again, and the partition metadata file should exist with the same ID.
+    log = createLog(logDir, logConfig)
+    assertTrue(log.partitionMetadataFile.exists())
+    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+  }
+
   /**
    * Test building the time index on the follower by setting assignOffsets to false.
    */
@@ -3117,7 +3148,7 @@ class LogTest {
     // Write a topic ID to the partition metadata file to ensure it is transferred correctly.
     val id = Uuid.randomUuid()
     log.topicId = id
-    log.partitionMetadataFile.write(id)
+    log.assignTopicId(id)
 
     log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5)
     assertEquals(Some(5), log.latestEpoch)
@@ -3136,6 +3167,26 @@ class LogTest {
   }
 
   @Test
+  def testTopicIdFlushesBeforeDirectoryRename(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
+    val log = createLog(logDir, logConfig)
+
+    // Write a topic ID to the partition metadata file to ensure it is transferred correctly.
+    val topicId = Uuid.randomUuid()
+    log.partitionMetadataFile.record(topicId)
+
+    // Ensure that after a directory rename, the partition metadata file is written to the right location.
+    val tp = Log.parseTopicPartitionName(log.dir)
+    log.renameDir(Log.logDeleteDirName(tp))
+    assertTrue(PartitionMetadataFile.newFile(log.dir).exists())
+    assertFalse(PartitionMetadataFile.newFile(this.logDir).exists())
+
+    // Check the file holds the correct contents.
+    assertTrue(log.partitionMetadataFile.exists())
+    assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+  }
+
+  @Test
   def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = {
     val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024)
     val log = createLog(logDir, logConfig)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
new file mode 100644
index 0000000..d78bad3
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.server;
+
+import java.util.Properties;
+
+import kafka.cluster.Partition;
+import kafka.log.CleanerConfig;
+import kafka.log.Defaults;
+import kafka.log.LogConfig;
+import kafka.log.LogManager;
+import kafka.server.AlterIsrManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.LogDirFailureChannel;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+import kafka.server.ZkMetadataCache;
+import kafka.server.checkpoints.OffsetCheckpoints;
+import kafka.server.metadata.CachedConfigRepository;
+import kafka.server.metadata.ConfigRepository;
+import kafka.utils.KafkaScheduler;
+import kafka.utils.Scheduler;
+import kafka.utils.TestUtils;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.LeaderAndIsrRequestData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+import scala.Option;
+
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(3)
+@BenchmarkMode(Mode.AverageTime)
+@State(value = Scope.Benchmark)
+public class PartitionCreationBench {
+    @Param({"false", "true"})
+    public boolean useTopicIds;
+
+    @Param({"2000"})
+    public int numPartitions;
+
+    private final String topicName = "foo";
+
+    private Option<Uuid> topicId;
+    private Scheduler scheduler;
+    private Metrics metrics;
+    private Time time;
+    private KafkaConfig brokerProperties;
+
+    private ReplicaManager replicaManager;
+    private QuotaFactory.QuotaManagers quotaManagers;
+    private LogDirFailureChannel failureChannel;
+    private LogManager logManager;
+    private AlterIsrManager alterIsrManager;
+    private List<TopicPartition> topicPartitions;
+
+    @SuppressWarnings("deprecation")
+    @Setup(Level.Invocation)
+    public void setup() {
+        if (useTopicIds)
+            topicId = Option.apply(Uuid.randomUuid());
+        else
+            topicId = Option.empty();
+
+        this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
+        this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
+                0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
+                Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
+                (short) 1));
+        this.metrics = new Metrics();
+        this.time = Time.SYSTEM;
+        this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
+        final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        final List<File> files =
+                JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+        CleanerConfig cleanerConfig = CleanerConfig.apply(1,
+                4 * 1024 * 1024L, 0.9d,
+                1024 * 1024, 32 * 1024 * 1024,
+                Double.MAX_VALUE, 15 * 1000, true, "MD5");
+
+        ConfigRepository configRepository = new CachedConfigRepository();
+        this.logManager = new LogManager(JavaConverters.asScalaIteratorConverter(files.iterator()).asScala().toSeq(),
+                JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
+                configRepository,
+                createLogConfig(),
+                cleanerConfig,
+                1,
+                1000L,
+                10000L,
+                10000L,
+                1000L,
+                60000,
+                scheduler,
+                brokerTopicStats,
+                failureChannel,
+                Time.SYSTEM,
+                true);
+        scheduler.startup();
+        final MetadataCache metadataCache =
+                new ZkMetadataCache(this.brokerProperties.brokerId());
+        this.quotaManagers =
+                QuotaFactory.instantiate(this.brokerProperties,
+                        this.metrics,
+                        this.time, "");
+
+        KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
+            @Override
+            public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
+                return new Properties();
+            }
+        };
+        this.alterIsrManager = TestUtils.createAlterIsrManager();
+        this.replicaManager = new ReplicaManager(
+                this.brokerProperties,
+                this.metrics,
+                this.time,
+                Option.apply(zkClient),
+                this.scheduler,
+                this.logManager,
+                new AtomicBoolean(false),
+                this.quotaManagers,
+                brokerTopicStats,
+                metadataCache,
+                this.failureChannel,
+                alterIsrManager,
+                configRepository,
+                Option.empty());
+        replicaManager.startup();
+        replicaManager.checkpointHighWatermarks();
+    }
+
+    @TearDown(Level.Invocation)
+    public void tearDown() throws Exception {
+        this.replicaManager.shutdown(false);
+        logManager.shutdown();
+        this.metrics.close();
+        this.scheduler.shutdown();
+        this.quotaManagers.shutdown();
+        for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
+            Utils.delete(dir);
+        }
+    }
+
+    private static LogConfig createLogConfig() {
+        Properties logProps = new Properties();
+        logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
+        logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
+        logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
+        logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
+        logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
+        logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
+        logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
+        logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
+        logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
+        logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
+        logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
+        return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
+    }
+
+    @Benchmark
+    @Threads(1)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    public void makeFollower() {
+        topicPartitions = new ArrayList<>();
+        for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
+            topicPartitions.add(new TopicPartition(topicName, partitionNum));
+        }
+
+        List<Integer> replicas = new ArrayList<>();
+        replicas.add(0);
+        replicas.add(1);
+        replicas.add(2);
+
+        OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
+        for (TopicPartition topicPartition : topicPartitions) {
+            final Partition partition = this.replicaManager.createPartition(topicPartition);
+            List<Integer> inSync = new ArrayList<>();
+            inSync.add(0);
+            inSync.add(1);
+            inSync.add(2);
+
+            LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+                    .setControllerEpoch(0)
+                    .setLeader(0)
+                    .setLeaderEpoch(0)
+                    .setIsr(inSync)
+                    .setZkVersion(1)
+                    .setReplicas(replicas)
+                    .setIsNew(true);
+
+            partition.makeFollower(partitionState, checkpoints);
+            topicId.foreach(partition::checkOrSetTopicId);
+        }
+    }
+}