You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/13 20:17:19 UTC

[kafka] branch trunk updated: KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (#8293)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 454c3cf  KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (#8293)
454c3cf is described below

commit 454c3cf6171d151a1ecf37d28a800ae254d375ca
Author: Kowshik Prakasam <ko...@gmail.com>
AuthorDate: Fri Mar 13 13:16:45 2020 -0700

    KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (#8293)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/coordinator/transaction/TransactionCoordinator.scala  | 2 +-
 .../scala/kafka/coordinator/transaction/TransactionStateManager.scala | 4 +---
 .../transaction/TransactionCoordinatorConcurrencyTest.scala           | 3 +--
 .../kafka/coordinator/transaction/TransactionStateManagerTest.scala   | 2 +-
 4 files changed, 4 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 5266437..b88dc53 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -56,7 +56,7 @@ object TransactionCoordinator {
     val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
       reaperEnabled = false, timerEnabled = false)
     val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig,
-      time, metrics, config.interBrokerProtocolVersion)
+      time, metrics)
 
     val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
     val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 174e3a5..ceed6ac 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.api.ApiVersion
 import kafka.log.{AppendOrigin, LogConfig}
 import kafka.message.UncompressedCodec
 import kafka.server.{Defaults, FetchLogEnd, ReplicaManager}
@@ -74,8 +73,7 @@ class TransactionStateManager(brokerId: Int,
                               replicaManager: ReplicaManager,
                               config: TransactionConfig,
                               time: Time,
-                              metrics: Metrics,
-                              interBrokerProtocolVersion: ApiVersion) extends Logging {
+                              metrics: Metrics) extends Logging {
 
   this.logIdent = "[Transaction State Manager " + brokerId + "]: "
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 6b05ef3..9422748 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
 
-import kafka.api.KAFKA_2_4_IV1
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.transaction.TransactionCoordinatorConcurrencyTest._
@@ -73,7 +72,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
     EasyMock.replay(zkClient)
 
     txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time,
-      new Metrics(), KAFKA_2_4_IV1)
+      new Metrics())
     for (i <- 0 until numPartitions)
       txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]())
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index dab9181..cb3c2fe 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -67,7 +67,7 @@ class TransactionStateManagerTest {
 
   val txnConfig = TransactionConfig()
   val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler,
-    replicaManager, txnConfig, time, metrics, KAFKA_2_4_IV1)
+    replicaManager, txnConfig, time, metrics)
 
   val transactionalId1: String = "one"
   val transactionalId2: String = "two"