You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/22 23:15:05 UTC

kafka git commit: KAFKA-4032; Uncaught exceptions when autocreating topics

Repository: kafka
Updated Branches:
  refs/heads/trunk f90321553 -> 05d00b5ac


KAFKA-4032; Uncaught exceptions when autocreating topics

handled by adding a catch all for any unhandled exception. Because the jira specifically mentions the InvalidReplicationFactor exception, a test was added for that specific case.

Author: Grant Henke <gr...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #1739 from granthenke/create-errors


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05d00b5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05d00b5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05d00b5a

Branch: refs/heads/trunk
Commit: 05d00b5aca2e1e59ad685a3f051d2ab022f75acc
Parents: f903215
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Aug 23 00:14:44 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Aug 23 00:14:44 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  4 ++--
 .../integration/BaseTopicMetadataTest.scala     | 25 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6eb574f..0a5258e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -643,8 +643,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       case e: TopicExistsException => // let it go, possibly another broker created this topic
         new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
-      case itex: InvalidTopicException =>
-        new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, Topic.isInternal(topic),
+      case ex: Throwable  => // Catch all to prevent unhandled errors
+        new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic),
           java.util.Collections.emptyList())
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 7c9f3ae..24ed954 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -133,6 +133,31 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testAutoCreateTopicWithInvalidReplication {
+    val adHocProps = createBrokerConfig(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile)
+    // Set default replication higher than the number of live brokers
+    adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
+    // start adHoc brokers with replication factor too high
+    val adHocServer = createServer(new KafkaConfig(adHocProps))
+    // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
+    // `securityProtocol` instead of PLAINTEXT below
+    val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
+      adHocServer.boundPort(SecurityProtocol.PLAINTEXT))
+
+    // auto create topic on "bad" endpoint
+    val topic = "testAutoCreateTopic"
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    adHocServer.shutdown()
+  }
+
+  @Test
   def testAutoCreateTopicWithCollision {
     // auto create topic
     val topic1 = "testAutoCreate_Topic"