You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/22 23:15:05 UTC
kafka git commit: KAFKA-4032;
Uncaught exceptions when autocreating topics
Repository: kafka
Updated Branches:
refs/heads/trunk f90321553 -> 05d00b5ac
KAFKA-4032; Uncaught exceptions when autocreating topics
handled by adding a catch all for any unhandled exception. Because the jira specifically mentions the InvalidReplicationFactor exception, a test was added for that specific case.
Author: Grant Henke <gr...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #1739 from granthenke/create-errors
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05d00b5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05d00b5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05d00b5a
Branch: refs/heads/trunk
Commit: 05d00b5aca2e1e59ad685a3f051d2ab022f75acc
Parents: f903215
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Aug 23 00:14:44 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Aug 23 00:14:44 2016 +0100
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 4 ++--
.../integration/BaseTopicMetadataTest.scala | 25 ++++++++++++++++++++
2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6eb574f..0a5258e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -643,8 +643,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: TopicExistsException => // let it go, possibly another broker created this topic
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
- case itex: InvalidTopicException =>
- new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, topic, Topic.isInternal(topic),
+ case ex: Throwable => // Catch all to prevent unhandled errors
+ new MetadataResponse.TopicMetadata(Errors.forException(ex), topic, Topic.isInternal(topic),
java.util.Collections.emptyList())
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/05d00b5a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 7c9f3ae..24ed954 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -133,6 +133,31 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
}
@Test
+ def testAutoCreateTopicWithInvalidReplication {
+ val adHocProps = createBrokerConfig(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+ trustStoreFile = trustStoreFile)
+ // Set default replication higher than the number of live brokers
+ adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
+ // start adHoc brokers with replication factor too high
+ val adHocServer = createServer(new KafkaConfig(adHocProps))
+ // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
+ // `securityProtocol` instead of PLAINTEXT below
+ val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
+ adHocServer.boundPort(SecurityProtocol.PLAINTEXT))
+
+ // auto create topic on "bad" endpoint
+ val topic = "testAutoCreateTopic"
+ val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
+ 2000,0).topicsMetadata
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, topicsMetadata.head.errorCode)
+ assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+ assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+ assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+ adHocServer.shutdown()
+ }
+
+ @Test
def testAutoCreateTopicWithCollision {
// auto create topic
val topic1 = "testAutoCreate_Topic"