You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kic (JIRA)" <ji...@apache.org> on 2017/12/03 17:02:00 UTC
[jira] [Created] (KAFKA-6302) Topic can not be recreated after it
is deleted
kic created KAFKA-6302:
--------------------------
Summary: Topic can not be recreated after it is deleted
Key: KAFKA-6302
URL: https://issues.apache.org/jira/browse/KAFKA-6302
Project: Kafka
Issue Type: Bug
Components: admin, clients
Affects Versions: 1.0.0
Reporter: kic
I use an embedded kafka for unit test. My application relies on the ability to recreate topics programmatically. Currently it is not possible to re-create a topic after it has been deleted.
{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
package kic.kafka.embedded
import java.util.Properties
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._
import scala.collection.JavaConverters._
class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
val props = new Properties()
val testTopic = "test-topic"
"A running server" should "return a list of topics" in {
props.setProperty("bootstrap.servers", "localhost:10001")
props.setProperty("delete.enable.topic", "true")
props.setProperty("group.id", "test-client")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("clinet.id", "test-client")
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
EmbeddedKafaJavaWrapper.start(10001, 10002, props)
try {
implicit val admin = AdminClient.create(props)
// create topic and confirm it exists
createTopic(testTopic)
val topics = listTopics()
info(s"topics: $topics")
topics should contain(testTopic)
// now we should be able to send something to this topic
// TODO create producer and send something
// delete topic
deleteTopic(testTopic)
listTopics() shouldNot contain(testTopic)
// recreate topic
createTopic(testTopic)
// listTopics() should contain(testTopic)
// and finally consume from the topic and expect to get 0 entries
// TODO create consumer and poll once
} finally {
EmbeddedKafaJavaWrapper.stop()
}
}
def listTopics()(implicit admin: AdminClient) =
admin.listTopics().names().get()
def createTopic(topic: String)(implicit admin: AdminClient) =
admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
def deleteTopic(topic: String)(implicit admin: AdminClient) =
admin.deleteTopics(Seq("test-topic").asJava).all().get()
}
{code}
Btw, what happens to connected consumers when I delete a topic?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)