You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/05 19:01:47 UTC
[kafka] branch trunk updated: MINOR: Do not use optional args in `ProducerStateManager` (#11734)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba0fe61 MINOR: Do not use optional args in `ProducerStateManager` (#11734)
ba0fe61 is described below
commit ba0fe610eddf2710cdb404f198a9886c01293de3
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sat Feb 5 11:00:17 2022 -0800
MINOR: Do not use optional args in `ProducerStateManager` (#11734)
We allowed `maxProducerIdExpirationMs` and `time` to be optional in the `ProducerStateManager` constructor. We generally frown on optional arguments since it is too easy to overlook them. In this case, it was especially dangerous because the recently added `maxTransactionTimeoutMs` argument used the same type as `maxProducerIdExpirationMs`.
Reviewers: David Jacot <david.jacot@gmail.com, Ismael Juma <is...@juma.me.uk>
---
.../scala/kafka/log/ProducerStateManager.scala | 12 +++++----
core/src/main/scala/kafka/log/UnifiedLog.scala | 4 +--
.../unit/kafka/cluster/PartitionLockTest.scala | 8 +++++-
.../scala/unit/kafka/cluster/PartitionTest.scala | 9 +++++--
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 29 ++++++++++++++--------
.../unit/kafka/log/ProducerStateManagerTest.scala | 4 +--
6 files changed, 43 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 8313f24..60d2d28 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -483,11 +483,13 @@ object ProducerStateManager {
* been deleted.
*/
@nonthreadsafe
-class ProducerStateManager(val topicPartition: TopicPartition,
- @volatile var _logDir: File,
- val maxTransactionTimeoutMs: Int,
- val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
- val time: Time = Time.SYSTEM) extends Logging {
+class ProducerStateManager(
+ val topicPartition: TopicPartition,
+ @volatile var _logDir: File,
+ val maxTransactionTimeoutMs: Int,
+ val maxProducerIdExpirationMs: Int,
+ val time: Time
+) extends Logging {
import ProducerStateManager._
import java.util
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 11f66aa..3742d63 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1775,7 +1775,7 @@ object UnifiedLog extends Logging {
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
- time: Time = Time.SYSTEM,
+ time: Time,
maxTransactionTimeoutMs: Int,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
@@ -1794,7 +1794,7 @@ object UnifiedLog extends Logging {
config.recordVersion,
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
val producerStateManager = new ProducerStateManager(topicPartition, dir,
- maxTransactionTimeoutMs, maxProducerIdExpirationMs)
+ maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
val offsets = new LogLoader(
dir,
topicPartition,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8c13035..b55c62f 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -287,7 +287,13 @@ class PartitionLockTest extends Logging {
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeout = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
- val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxTransactionTimeout, maxProducerIdExpirationMs)
+ val producerStateManager = new ProducerStateManager(
+ log.topicPartition,
+ log.dir,
+ maxTransactionTimeout,
+ maxProducerIdExpirationMs,
+ mockTime
+ )
val offsets = new LogLoader(
log.dir,
log.topicPartition,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index bcab6ec..972d9d5 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -214,8 +214,13 @@ class PartitionTest extends AbstractPartitionTest {
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
- val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir,
- maxTransactionTimeoutMs, maxProducerIdExpirationMs)
+ val producerStateManager = new ProducerStateManager(
+ log.topicPartition,
+ log.dir,
+ maxTransactionTimeoutMs,
+ maxProducerIdExpirationMs,
+ mockTime
+ )
val offsets = new LogLoader(
log.dir,
log.topicPartition,
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 226330c..52d58f2 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -19,8 +19,7 @@ package kafka.log
import java.io.File
import kafka.server.checkpoints.LeaderEpochCheckpoint
-import kafka.server.epoch.EpochEntry
-import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
@@ -29,13 +28,11 @@ import org.apache.kafka.common.utils.{MockTime, Time, Utils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import scala.jdk.CollectionConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
class LogSegmentTest {
-
- private val maxTransactionTimeoutMs = 5 * 60 * 1000
private val topicPartition = new TopicPartition("topic", 0)
private val segments = mutable.ArrayBuffer[LogSegment]()
private var logDir: File = _
@@ -302,7 +299,7 @@ class LogSegmentTest {
seg.append(i, RecordBatch.NO_TIMESTAMP, -1L, records(i, i.toString))
val indexFile = seg.lazyOffsetIndex.file
TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
- seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+ seg.recover(newProducerStateManager())
for(i <- 0 until 100) {
val records = seg.read(i, 1, minOneMessage = true).records.records
assertEquals(i, records.iterator.next().offset)
@@ -342,7 +339,7 @@ class LogSegmentTest {
segment.append(largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 107L, records = endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
- var stateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs)
+ var stateManager = newProducerStateManager()
segment.recover(stateManager)
assertEquals(108L, stateManager.mapEndOffset)
@@ -356,7 +353,7 @@ class LogSegmentTest {
assertEquals(100L, abortedTxn.lastStableOffset)
// recover again, but this time assuming the transaction from pid2 began on a previous segment
- stateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs)
+ stateManager = newProducerStateManager()
stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch,
0, RecordBatch.NO_TIMESTAMP, Some(75L)))
@@ -407,7 +404,7 @@ class LogSegmentTest {
shallowOffsetOfMaxTimestamp = 110, records = MemoryRecords.withRecords(110L, CompressionType.NONE, 2,
new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
- seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs), Some(cache))
+ seg.recover(newProducerStateManager(), Some(cache))
assertEquals(ArrayBuffer(EpochEntry(epoch = 0, startOffset = 104L),
EpochEntry(epoch = 1, startOffset = 106),
EpochEntry(epoch = 2, startOffset = 110)),
@@ -436,7 +433,7 @@ class LogSegmentTest {
seg.append(i, i * 10, i, records(i, i.toString))
val timeIndexFile = seg.lazyTimeIndex.file
TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
- seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+ seg.recover(newProducerStateManager())
for(i <- 0 until 100) {
assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
if (i < 99)
@@ -460,7 +457,7 @@ class LogSegmentTest {
val recordPosition = seg.log.searchForOffsetWithSize(offsetToBeginCorruption, 0)
val position = recordPosition.position + TestUtils.random.nextInt(15)
TestUtils.writeNonsenseToFile(seg.log.file, position, (seg.log.file.length - position).toInt)
- seg.recover(new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs))
+ seg.recover(newProducerStateManager())
assertEquals((0 until offsetToBeginCorruption).toList, seg.log.batches.asScala.map(_.lastOffset).toList,
"Should have truncated off bad messages.")
seg.deleteIfExists()
@@ -587,4 +584,14 @@ class LogSegmentTest {
Utils.delete(tempDir)
}
+ private def newProducerStateManager(): ProducerStateManager = {
+ new ProducerStateManager(
+ topicPartition,
+ logDir,
+ maxTransactionTimeoutMs = 5 * 60 * 1000,
+ maxProducerIdExpirationMs = 60 * 60 * 1000,
+ time = new MockTime()
+ )
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 7140477..508d969 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -1011,7 +1011,7 @@ class ProducerStateManagerTest {
@Test
def testRemoveAndMarkSnapshotForDeletion(): Unit = {
UnifiedLog.producerSnapshotFile(logDir, 5).createNewFile()
- val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, time = time)
+ val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
assertTrue(manager.latestSnapshotOffset.isDefined)
val snapshot = manager.removeAndMarkSnapshotForDeletion(5).get
assertTrue(snapshot.file.toPath.toString.endsWith(UnifiedLog.DeletedFileSuffix))
@@ -1029,7 +1029,7 @@ class ProducerStateManagerTest {
def testRemoveAndMarkSnapshotForDeletionAlreadyDeleted(): Unit = {
val file = UnifiedLog.producerSnapshotFile(logDir, 5)
file.createNewFile()
- val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, time = time)
+ val manager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, time)
assertTrue(manager.latestSnapshotOffset.isDefined)
Files.delete(file.toPath)
assertTrue(manager.removeAndMarkSnapshotForDeletion(5).isEmpty)