You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/13 20:17:19 UTC
[kafka] branch trunk updated: KAFKA-9714;
Eliminate unused reference to IBP in `TransactionStateManager`
(#8293)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 454c3cf KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (#8293)
454c3cf is described below
commit 454c3cf6171d151a1ecf37d28a800ae254d375ca
Author: Kowshik Prakasam <ko...@gmail.com>
AuthorDate: Fri Mar 13 13:16:45 2020 -0700
KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (#8293)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/coordinator/transaction/TransactionCoordinator.scala | 2 +-
.../scala/kafka/coordinator/transaction/TransactionStateManager.scala | 4 +---
.../transaction/TransactionCoordinatorConcurrencyTest.scala | 3 +--
.../kafka/coordinator/transaction/TransactionStateManagerTest.scala | 2 +-
4 files changed, 4 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 5266437..b88dc53 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -56,7 +56,7 @@ object TransactionCoordinator {
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
reaperEnabled = false, timerEnabled = false)
val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig,
- time, metrics, config.interBrokerProtocolVersion)
+ time, metrics)
val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 174e3a5..ceed6ac 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.api.ApiVersion
import kafka.log.{AppendOrigin, LogConfig}
import kafka.message.UncompressedCodec
import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
@@ -74,8 +73,7 @@ class TransactionStateManager(brokerId: Int,
replicaManager: ReplicaManager,
config: TransactionConfig,
time: Time,
- metrics: Metrics,
- interBrokerProtocolVersion: ApiVersion) extends Logging {
+ metrics: Metrics) extends Logging {
this.logIdent = "[Transaction State Manager " + brokerId + "]: "
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 6b05ef3..9422748 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.transaction
import java.nio.ByteBuffer
-import kafka.api.KAFKA_2_4_IV1
import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
@@ -73,7 +72,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
EasyMock.replay(zkClient)
txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time,
- new Metrics(), KAFKA_2_4_IV1)
+ new Metrics())
for (i <- 0 until numPartitions)
txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]())
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index dab9181..cb3c2fe 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -67,7 +67,7 @@ class TransactionStateManagerTest {
val txnConfig = TransactionConfig()
val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler,
- replicaManager, txnConfig, time, metrics, KAFKA_2_4_IV1)
+ replicaManager, txnConfig, time, metrics)
val transactionalId1: String = "one"
val transactionalId2: String = "two"