You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/03 20:26:11 UTC

kafka git commit: MINOR: Fix error logged if not enough alive brokers for transactions state topic

Repository: kafka
Updated Branches:
  refs/heads/trunk a7671c7f3 -> cfb238674


MINOR: Fix error logged if not enough alive brokers for transactions state topic

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2954 from ijuma/fix-error-message-if-transactions-topic-replication-factor-too-low


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cfb23867
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cfb23867
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cfb23867

Branch: refs/heads/trunk
Commit: cfb2386743cc1981d7eb5c042ae8cbad70f99370
Parents: a7671c7
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed May 3 13:26:08 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed May 3 13:26:08 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 46 ++++++++++++--------
 1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cfb23867/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1e1f0d5..3d821f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -802,27 +802,35 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = {
-    if (topic == null) throw new IllegalArgumentException("topic must not be null")
+    if (topic == null)
+      throw new IllegalArgumentException("topic must not be null")
 
     val aliveBrokers = metadataCache.getAliveBrokers
-    val requiredReplicas = if (topic == GroupMetadataTopicName)
-      config.offsetsTopicReplicationFactor
-    else
-      config.transactionTopicReplicationFactor
-
-    if (aliveBrokers.size < requiredReplicas) {
-      error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
-        s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
-        s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
-        s"and not all brokers are up yet.")
-      new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
-    } else {
-      if (topic == GroupMetadataTopicName)
-        createTopic(topic, config.offsetsTopicPartitions,
-          config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs)
-      else
-        createTopic(topic, config.transactionTopicPartitions,
-          config.transactionTopicReplicationFactor.toInt, txnCoordinator.transactionTopicConfigs)
+
+    topic match {
+      case GroupMetadataTopicName =>
+        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
+            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
+            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
+            s"and not all brokers are up yet.")
+          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+        } else {
+          createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt,
+            groupCoordinator.offsetsTopicConfigs)
+        }
+      case TransactionStateTopicName =>
+        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
+          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " +
+            s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " +
+            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
+            s"and not all brokers are up yet.")
+          new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList())
+        } else {
+          createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt,
+            txnCoordinator.transactionTopicConfigs)
+        }
+      case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic")
     }
   }