You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/05 19:01:47 UTC

[kafka] branch trunk updated: MINOR: Do not use optional args in `ProducerStateManager` (#11734)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba0fe61  MINOR: Do not use optional args in `ProducerStateManager` (#11734)
ba0fe61 is described below

commit ba0fe610eddf2710cdb404f198a9886c01293de3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sat Feb 5 11:00:17 2022 -0800

    MINOR: Do not use optional args in `ProducerStateManager` (#11734)
    
    We allowed `maxProducerIdExpirationMs` and `time` to be optional in the `ProducerStateManager` constructor. We generally frown on optional arguments since it is too easy to overlook them. In this case, it was especially dangerous because the recently added `maxTransactionTimeoutMs` argument used the same type as `maxProducerIdExpirationMs`.
    
    Reviewers: David Jacot <david.jacot@gmail.com, Ismael Juma <is...@juma.me.uk>
---
 .../scala/kafka/log/ProducerStateManager.scala     | 12 +++++----
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  4 +--
 .../unit/kafka/cluster/PartitionLockTest.scala     |  8 +++++-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  9 +++++--
 .../test/scala/unit/kafka/log/LogSegmentTest.scala | 29 ++++++++++++++--------
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  4 +--
 6 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 8313f24..60d2d28 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -483,11 +483,13 @@ object ProducerStateManager {
  * been deleted.
  */
 @nonthreadsafe
-class ProducerStateManager(val topicPartition: TopicPartition,
-                           @volatile var _logDir: File,
-                           val maxTransactionTimeoutMs: Int,
-                           val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
-                           val time: Time = Time.SYSTEM) extends Logging {
+class ProducerStateManager(
+  val topicPartition: TopicPartition,
+  @volatile var _logDir: File,
+  val maxTransactionTimeoutMs: Int,
+  val maxProducerIdExpirationMs: Int,
+  val time: Time
+) extends Logging {
   import ProducerStateManager._
   import java.util
 
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 11f66aa..3742d63 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1775,7 +1775,7 @@ object UnifiedLog extends Logging {
             recoveryPoint: Long,
             scheduler: Scheduler,
             brokerTopicStats: BrokerTopicStats,
-            time: Time = Time.SYSTEM,
+            time: Time,
             maxTransactionTimeoutMs: Int,
             maxProducerIdExpirationMs: Int,
             producerIdExpirationCheckIntervalMs: Int,
@@ -1794,7 +1794,7 @@ object UnifiedLog extends Logging {
       config.recordVersion,
       s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
-      maxTransactionTimeoutMs, maxProducerIdExpirationMs)
+      maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
     val offsets = new LogLoader(
       dir,
       topicPartition,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8c13035..b55c62f 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -287,7 +287,13 @@ class PartitionLockTest extends Logging {
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxTransactionTimeout = 5 * 60 * 1000
         val maxProducerIdExpirationMs = 60 * 60 * 1000
-        val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxTransactionTimeout, maxProducerIdExpirationMs)
+        val producerStateManager = new ProducerStateManager(
+          log.topicPartition,
+          log.dir,
+          maxTransactionTimeout,
+          maxProducerIdExpirationMs,
+          mockTime
+        )
         val offsets = new LogLoader(
           log.dir,
           log.topicPartition,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index bcab6ec..972d9d5 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -214,8 +214,13 @@ class PartitionTest extends AbstractPartitionTest {
         val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
         val maxTransactionTimeoutMs = 5 * 60 * 1000
         val maxProducerIdExpirationMs = 60 * 60 * 1000
-        val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir,
-          maxTransactionTimeoutMs, maxProducerIdExpirationMs)
+        val producerStateManager = new ProducerStateManager(
+          log.topicPartition,
+          log.dir,
+          maxTransactionTimeoutMs,
+          maxProducerIdExpirationMs,
+          mockTime
+        )
         val offsets = new LogLoader(
           log.dir,
           log.topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 226330c..52d58f2 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -19,8 +19,7 @@ package kafka.log
 import java.io.File
 
 import kafka.server.checkpoints.LeaderEpochCheckpoint
-import kafka.server.epoch.EpochEntry
-import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.checkEquals
 import org.apache.kafka.common.TopicPartition
@@ -29,13 +28,11 @@ import org.apache.kafka.common.utils.{MockTime, Time, Utils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
-import scala.jdk.CollectionConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
 
 class LogSegmentTest {
-
-  private val maxTransactionTimeoutMs = 5 * 60 * 1000
   private val topicPartition = new TopicPartition("topic", 0)
   private val segments = mutable.ArrayBuffer[LogSegment]()
   private var logDir: File = _
@@ -302,7 +299,7 @@ class LogSegmentTest {
       seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
     val indexFile = seg.lazyOffsetIndex.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
-    seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+    seg.recover(newProducerStateManager())
     for(i <- 0 until 100) {
       val records = seg.read(i, 1, minOneMessage = true).records.records
       assertEquals(i, records.iterator.next().offset)
@@ -342,7 +339,7 @@ class LogSegmentTest {
     segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
       shallowOffsetOfMaxTimestamp = 107L, records = endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
 
-    var stateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs)
+    var stateManager = newProducerStateManager()
     segment.recover(stateManager)
     assertEquals(108L, stateManager.mapEndOffset)
 
@@ -356,7 +353,7 @@ class LogSegmentTest {
     assertEquals(100L, abortedTxn.lastStableOffset)
 
     // recover again, but this time assuming the transaction from pid2 began on a previous segment
-    stateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs)
+    stateManager = newProducerStateManager()
     stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
       mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
       0, RecordBatch.NO_TIMESTAMP, Some(75L)))
@@ -407,7 +404,7 @@ class LogSegmentTest {
       shallowOffsetOfMaxTimestamp = 110, records = MemoryRecords.withRecords(110L, CompressionType.NONE, 2,
         new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
 
-    seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs), Some(cache))
+    seg.recover(newProducerStateManager(), Some(cache))
     assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
                              EpochEntry(epoch = 1, startOffset = 106),
                              EpochEntry(epoch = 2, startOffset = 110)),
@@ -436,7 +433,7 @@ class LogSegmentTest {
       seg.append(i, i * 10, i, records(i, i.toString))
     val timeIndexFile = seg.lazyTimeIndex.file
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
-    seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+    seg.recover(newProducerStateManager())
     for(i <- 0 until 100) {
       assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
       if (i < 99)
@@ -460,7 +457,7 @@ class LogSegmentTest {
       val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
       val position = recordPosition.position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
-      seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+      seg.recover(newProducerStateManager())
       assertEquals((0 until offsetToBeginCorruption).toList, seg.log.batches.asScala.map(_.lastOffset).toList,
         "Should have truncated off bad messages.")
       seg.deleteIfExists()
@@ -587,4 +584,14 @@ class LogSegmentTest {
     Utils.delete(tempDir)
   }
 
+  private def newProducerStateManager(): ProducerStateManager = {
+    new ProducerStateManager(
+      topicPartition,
+      logDir,
+      maxTransactionTimeoutMs = 5 * 60 * 1000,
+      maxProducerIdExpirationMs = 60 * 60 * 1000,
+      time = new MockTime()
+    )
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 7140477..508d969 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -1011,7 +1011,7 @@ class ProducerStateManagerTest {
   @Test
   def testRemoveAndMarkSnapshotForDeletion(): Unit = {
     UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile()
-    val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, time = time)
+    val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
     assertTrue(manager.latestSnapshotOffset.isDefined)
     val snapshot = manager.removeAndMarkSnapshotForDeletion(5).get
     assertTrue(snapshot.file.toPath.toString.endsWith(UnifiedLog.DeletedFileSuffix))
@@ -1029,7 +1029,7 @@ class ProducerStateManagerTest {
   def testRemoveAndMarkSnapshotForDeletionAlreadyDeleted(): Unit = {
     val file = UnifiedLog.producerSnapshotFile(logDir, 5)
     file.createNewFile()
-    val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, time = time)
+    val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
     assertTrue(manager.latestSnapshotOffset.isDefined)
     Files.delete(file.toPath)
     assertTrue(manager.removeAndMarkSnapshotForDeletion(5).isEmpty)