You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 20:26:11 UTC
kafka git commit: MINOR: Fix error logged if not enough alive brokers
for transactions state topic
Repository: kafka
Updated Branches:
refs/heads/trunk a7671c7f3 -> cfb238674
MINOR: Fix error logged if not enough alive brokers for transactions state topic
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2954 from ijuma/fix-error-message-if-transactions-topic-replication-factor-too-low
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfb23867
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfb23867
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfb23867
Branch: refs/heads/trunk
Commit: cfb2386743cc1981d7eb5c042ae8cbad70f99370
Parents: a7671c7
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 3 13:26:08 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 3 13:26:08 2017 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 46 ++++++++++++--------
1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cfb23867/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1e1f0d5..3d821f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -802,27 +802,35 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = {
- if (topic == null) throw new IllegalArgumentException("topic must not be null")
+ if (topic == null)
+ throw new IllegalArgumentException("topic must not be null")
val aliveBrokers = metadataCache.getAliveBrokers
- val requiredReplicas = if (topic == GroupMetadataTopicName)
- config.offsetsTopicReplicationFactor
- else
- config.transactionTopicReplicationFactor
-
- if (aliveBrokers.size < requiredReplicas) {
- error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
- s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
- s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
- s"and not all brokers are up yet.")
- new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
- } else {
- if (topic == GroupMetadataTopicName)
- createTopic(topic, config.offsetsTopicPartitions,
- config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs)
- else
- createTopic(topic, config.transactionTopicPartitions,
- config.transactionTopicReplicationFactor.toInt, txnCoordinator.transactionTopicConfigs)
+
+ topic match {
+ case GroupMetadataTopicName =>
+ if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+ error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
+ s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
+ s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
+ s"and not all brokers are up yet.")
+ new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+ } else {
+ createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
+ groupCoordinator.offsetsTopicConfigs)
+ }
+ case TransactionStateTopicName =>
+ if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
+ error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
+ s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
+ s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
+ s"and not all brokers are up yet.")
+ new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+ } else {
+ createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
+ txnCoordinator.transactionTopicConfigs)
+ }
+ case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
}
}