You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kic (JIRA)" <ji...@apache.org> on 2017/12/03 17:04:00 UTC
[jira] [Updated] (KAFKA-6302) Topic can not be recreated after it
is deleted
[ https://issues.apache.org/jira/browse/KAFKA-6302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kic updated KAFKA-6302:
-----------------------
Description:
I use an embedded kafka for unit test. My application relies on the ability to recreate topics programmatically. Currently it is not possible to re-create a topic after it has been deleted.
{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
package kic.kafka.embedded
import java.util.Properties
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._
import scala.collection.JavaConverters._
class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
val props = new Properties()
val testTopic = "test-topic"
"The admin client" should "be able to create, delete and re-create topics" in {
props.setProperty("bootstrap.servers", "localhost:10001")
props.setProperty("delete.enable.topic", "true")
props.setProperty("group.id", "test-client")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("clinet.id", "test-client")
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
EmbeddedKafaJavaWrapper.start(10001, 10002, props)
try {
implicit val admin = AdminClient.create(props)
// create topic and confirm it exists
createTopic(testTopic)
val topics = listTopics()
info(s"topics: $topics")
topics should contain(testTopic)
// now we should be able to send something to this topic
// TODO create producer and send something
// delete topic
deleteTopic(testTopic)
listTopics() shouldNot contain(testTopic)
// recreate topic
createTopic(testTopic)
// listTopics() should contain(testTopic)
// and finally consume from the topic and expect to get 0 entries
// TODO create consumer and poll once
} finally {
EmbeddedKafaJavaWrapper.stop()
}
}
def listTopics()(implicit admin: AdminClient) =
admin.listTopics().names().get()
def createTopic(topic: String)(implicit admin: AdminClient) =
admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
def deleteTopic(topic: String)(implicit admin: AdminClient) =
admin.deleteTopics(Seq("test-topic").asJava).all().get()
}
{code}
Btw, what happens to connected producers/consumers when I delete a topic?
was:
I use an embedded kafka for unit test. My application relies on the ability to recreate topics programmatically. Currently it is not possible to re-create a topic after it has been deleted.
{code}
// needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
package kic.kafka.embedded
import java.util.Properties
import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
import org.scalatest._
import scala.collection.JavaConverters._
class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
val props = new Properties()
val testTopic = "test-topic"
"A running server" should "return a list of topics" in {
props.setProperty("bootstrap.servers", "localhost:10001")
props.setProperty("delete.enable.topic", "true")
props.setProperty("group.id", "test-client")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("clinet.id", "test-client")
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
EmbeddedKafaJavaWrapper.start(10001, 10002, props)
try {
implicit val admin = AdminClient.create(props)
// create topic and confirm it exists
createTopic(testTopic)
val topics = listTopics()
info(s"topics: $topics")
topics should contain(testTopic)
// now we should be able to send something to this topic
// TODO create producer and send something
// delete topic
deleteTopic(testTopic)
listTopics() shouldNot contain(testTopic)
// recreate topic
createTopic(testTopic)
// listTopics() should contain(testTopic)
// and finally consume from the topic and expect to get 0 entries
// TODO create consumer and poll once
} finally {
EmbeddedKafaJavaWrapper.stop()
}
}
def listTopics()(implicit admin: AdminClient) =
admin.listTopics().names().get()
def createTopic(topic: String)(implicit admin: AdminClient) =
admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
def deleteTopic(topic: String)(implicit admin: AdminClient) =
admin.deleteTopics(Seq("test-topic").asJava).all().get()
}
{code}
Btw, what happens to connected consumers when I delete a topic?
> Topic can not be recreated after it is deleted
> ----------------------------------------------
>
> Key: KAFKA-6302
> URL: https://issues.apache.org/jira/browse/KAFKA-6302
> Project: Kafka
> Issue Type: Bug
> Components: admin, clients
> Affects Versions: 1.0.0
> Reporter: kic
>
> I use an embedded kafka for unit test. My application relies on the ability to recreate topics programmatically. Currently it is not possible to re-create a topic after it has been deleted.
> {code}
> // needs compile time depedency 'net.manub:scalatest-embedded-kafka_2.11:1.0.0'
> package kic.kafka.embedded
> import java.util.Properties
> import org.apache.kafka.clients.admin.{AdminClient, NewTopic}
> import org.scalatest._
> import scala.collection.JavaConverters._
> class EmbeddedKafaJavaWrapperTest extends FlatSpec with Matchers {
> val props = new Properties()
> val testTopic = "test-topic"
> "The admin client" should "be able to create, delete and re-create topics" in {
> props.setProperty("bootstrap.servers", "localhost:10001")
> props.setProperty("delete.enable.topic", "true")
> props.setProperty("group.id", "test-client")
> props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer")
> props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
> props.setProperty("clinet.id", "test-client")
> props.setProperty("key.serializer", "org.apache.kafka.common.serialization.LongSerializer")
> props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
> EmbeddedKafaJavaWrapper.start(10001, 10002, props)
> try {
> implicit val admin = AdminClient.create(props)
> // create topic and confirm it exists
> createTopic(testTopic)
> val topics = listTopics()
> info(s"topics: $topics")
> topics should contain(testTopic)
> // now we should be able to send something to this topic
> // TODO create producer and send something
> // delete topic
> deleteTopic(testTopic)
> listTopics() shouldNot contain(testTopic)
> // recreate topic
> createTopic(testTopic)
> // listTopics() should contain(testTopic)
> // and finally consume from the topic and expect to get 0 entries
> // TODO create consumer and poll once
> } finally {
> EmbeddedKafaJavaWrapper.stop()
> }
> }
> def listTopics()(implicit admin: AdminClient) =
> admin.listTopics().names().get()
> def createTopic(topic: String)(implicit admin: AdminClient) =
> admin.createTopics(Seq(new NewTopic(topic, 1, 1)).asJava)
> def deleteTopic(topic: String)(implicit admin: AdminClient) =
> admin.deleteTopics(Seq("test-topic").asJava).all().get()
> }
> {code}
> Btw, what happens to connected producers/consumers when I delete a topic?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)