You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/11 12:37:05 UTC
kafka git commit: KAFKA-5182;
Close txn coordinator threads during broker shutdown
Repository: kafka
Updated Branches:
refs/heads/trunk bcf447e93 -> 3085d4f43
KAFKA-5182; Close txn coordinator threads during broker shutdown
Shutdown delayed delete purgatory thread, transaction marker purgatory thread and
send thread in `TransactionMarkerChannelManager` during broker shutdown.
Made `InterBrokerSendThread` interruptible so that it is shutdown.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3014 from rajinisivaram/KAFKA-5182
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3085d4f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3085d4f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3085d4f4
Branch: refs/heads/trunk
Commit: 3085d4f435bba74fc9aa077e23a4da3475b4d2ec
Parents: bcf447e
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu May 11 13:16:57 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu May 11 13:19:50 2017 +0100
----------------------------------------------------------------------
.../kafka/common/InterBrokerSendThread.scala | 5 +++--
.../transaction/TransactionCoordinator.scala | 1 +
.../scala/kafka/server/DelayedOperation.scala | 2 +-
.../src/main/scala/kafka/server/KafkaServer.scala | 18 ++++++++++--------
.../main/scala/kafka/server/ReplicaManager.scala | 1 +
5 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index b626954..217aa80 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -29,8 +29,9 @@ import org.apache.kafka.common.utils.Time
class InterBrokerSendThread(name: String,
networkClient: NetworkClient,
requestGenerator: () => Iterable[RequestAndCompletionHandler],
- time: Time)
- extends ShutdownableThread(name, isInterruptible = false) {
+ time: Time,
+ isInterruptible: Boolean = true)
+ extends ShutdownableThread(name, isInterruptible) {
override def doWork() {
val now = time.milliseconds()
http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 46c061e..38e725f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -406,6 +406,7 @@ class TransactionCoordinator(brokerId: Int,
pidManager.shutdown()
txnManager.shutdown()
txnMarkerChannelManager.shutdown()
+ txnMarkerPurgatory.shutdown()
info("Shutdown complete.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 7436904..c0efc53 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -393,7 +393,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
* A background reaper to expire delayed operations that have timed out
*/
private class ExpiredOperationReaper extends ShutdownableThread(
- "ExpirationReaper-%d".format(brokerId),
+ "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {
override def doWork() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3c681e4..431d192 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -589,25 +589,27 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
- if(socketServer != null)
+ if (socketServer != null)
CoreUtils.swallow(socketServer.shutdown())
- if(requestHandlerPool != null)
+ if (requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown())
CoreUtils.swallow(kafkaScheduler.shutdown())
- if(apis != null)
+ if (apis != null)
CoreUtils.swallow(apis.close())
CoreUtils.swallow(authorizer.foreach(_.close()))
- if(replicaManager != null)
+ if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown())
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown())
- if(groupCoordinator != null)
+ if (transactionCoordinator != null)
+ CoreUtils.swallow(transactionCoordinator.shutdown())
+ if (groupCoordinator != null)
CoreUtils.swallow(groupCoordinator.shutdown())
- if(logManager != null)
+ if (logManager != null)
CoreUtils.swallow(logManager.shutdown())
- if(kafkaController != null)
+ if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown())
- if(zkUtils != null)
+ if (zkUtils != null)
CoreUtils.swallow(zkUtils.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
http://git-wip-us.apache.org/repos/asf/kafka/blob/3085d4f4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 663ab1e..99c1b45 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1096,6 +1096,7 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.shutdown()
delayedFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
+ delayedDeleteRecordsPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
info("Shut down completely")