You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/11 12:37:05 UTC

kafka git commit: KAFKA-5182; Close txn coordinator threads during broker shutdown

Repository: kafka
Updated Branches:
  refs/heads/trunk bcf447e93 -> 3085d4f43


KAFKA-5182; Close txn coordinator threads during broker shutdown

Shutdown delayed delete purgatory thread, transaction marker purgatory thread and
send thread in `TransactionMarkerChannelManager` during broker shutdown.
Made `InterBrokerSendThread` interruptible so that it is shutdown.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3014 from rajinisivaram/KAFKA-5182


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3085d4f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3085d4f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3085d4f4

Branch: refs/heads/trunk
Commit: 3085d4f435bba74fc9aa077e23a4da3475b4d2ec
Parents: bcf447e
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu May 11 13:16:57 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 11 13:19:50 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/InterBrokerSendThread.scala      |  5 +++--
 .../transaction/TransactionCoordinator.scala      |  1 +
 .../scala/kafka/server/DelayedOperation.scala     |  2 +-
 .../src/main/scala/kafka/server/KafkaServer.scala | 18 ++++++++++--------
 .../main/scala/kafka/server/ReplicaManager.scala  |  1 +
 5 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index b626954..217aa80 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -29,8 +29,9 @@ import org.apache.kafka.common.utils.Time
 class InterBrokerSendThread(name: String,
                             networkClient: NetworkClient,
                             requestGenerator: () => Iterable[RequestAndCompletionHandler],
-                            time: Time)
-  extends ShutdownableThread(name, isInterruptible = false) {
+                            time: Time,
+                            isInterruptible: Boolean = true)
+  extends ShutdownableThread(name, isInterruptible) {
 
   override def doWork() {
     val now = time.milliseconds()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 46c061e..38e725f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -406,6 +406,7 @@ class TransactionCoordinator(brokerId: Int,
     pidManager.shutdown()
     txnManager.shutdown()
     txnMarkerChannelManager.shutdown()
+    txnMarkerPurgatory.shutdown()
     info("Shutdown complete.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 7436904..c0efc53 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -393,7 +393,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
    * A background reaper to expire delayed operations that have timed out
    */
   private class ExpiredOperationReaper extends ShutdownableThread(
-    "ExpirationReaper-%d".format(brokerId),
+    "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
     false) {
 
     override def doWork() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3c681e4..431d192 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -589,25 +589,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
         CoreUtils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        if(socketServer != null)
+        if (socketServer != null)
           CoreUtils.swallow(socketServer.shutdown())
-        if(requestHandlerPool != null)
+        if (requestHandlerPool != null)
           CoreUtils.swallow(requestHandlerPool.shutdown())
         CoreUtils.swallow(kafkaScheduler.shutdown())
-        if(apis != null)
+        if (apis != null)
           CoreUtils.swallow(apis.close())
         CoreUtils.swallow(authorizer.foreach(_.close()))
-        if(replicaManager != null)
+        if (replicaManager != null)
           CoreUtils.swallow(replicaManager.shutdown())
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown())
-        if(groupCoordinator != null)
+        if (transactionCoordinator != null)
+          CoreUtils.swallow(transactionCoordinator.shutdown())
+        if (groupCoordinator != null)
           CoreUtils.swallow(groupCoordinator.shutdown())
-        if(logManager != null)
+        if (logManager != null)
           CoreUtils.swallow(logManager.shutdown())
-        if(kafkaController != null)
+        if (kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown())
-        if(zkUtils != null)
+        if (zkUtils != null)
           CoreUtils.swallow(zkUtils.close())
         if (metrics != null)
           CoreUtils.swallow(metrics.close())

http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 663ab1e..99c1b45 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1096,6 +1096,7 @@ class ReplicaManager(val config: KafkaConfig,
     replicaFetcherManager.shutdown()
     delayedFetchPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
+    delayedDeleteRecordsPurgatory.shutdown()
     if (checkpointHW)
       checkpointHighWatermarks()
     info("Shut down completely")