You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/24 00:38:58 UTC
[pulsar] branch master updated: [Broker] Correct param of delete method for v1 topic (#12936)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6d9df8 [Broker] Correct param of delete method for v1 topic (#12936)
e6d9df8 is described below
commit e6d9df81c446870107dbb8d8e454b11b71cc9255
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Nov 24 08:37:18 2021 +0800
[Broker] Correct param of delete method for v1 topic (#12936)
---
.../pulsar/broker/admin/v1/PersistentTopics.java | 10 ++-
.../java/org/apache/pulsar/schema/SchemaTest.java | 90 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 2917482..09bb34b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -264,10 +264,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeletePartitionedTopic(asyncResponse, authoritative, force, false);
+ internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
@@ -302,9 +303,10 @@ public class PersistentTopics extends PersistentTopicsBase {
public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeleteTopic(authoritative, force);
+ internalDeleteTopic(authoritative, force, deleteSchema);
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 0a4512d..5c83361 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -776,6 +776,96 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
}
@Test
+ public void testDeleteTopicAndSchemaForV1() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String cluster = CLUSTER_NAME;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicOne = "not-partitioned-topic";
+ final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic";
+
+ // persistent, not-partitioned v1/topic
+ final String topic1 = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ cluster,
+ namespace,
+ topicOne).toString();
+
+ // persistent, partitioned v1/topic
+ admin.topics().createPartitionedTopic(topic2, 1);
+
+ @Cleanup
+ Producer<Schemas.PersonOne> p1_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+ .topic(topic1)
+ .create();
+
+ @Cleanup
+ Producer<Schemas.PersonThree> p1_2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic1)
+ .create();
+ @Cleanup
+ Producer<Schemas.PersonThree> p2_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic2)
+ .create();
+
+ List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures1 =
+ this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic1).getSchemaName()).get();
+ FutureUtil.waitForAll(schemaFutures1).get();
+ List<SchemaRegistry.SchemaAndMetadata> schemas1 = schemaFutures1.stream().map(future -> {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ return null;
+ }
+ }).collect(Collectors.toList());
+ assertEquals(schemas1.size(), 2);
+ for (SchemaRegistry.SchemaAndMetadata schema : schemas1) {
+ assertNotNull(schema);
+ }
+
+ List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures2 =
+ this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic2).getSchemaName()).get();
+ FutureUtil.waitForAll(schemaFutures2).get();
+ List<SchemaRegistry.SchemaAndMetadata> schemas2 = schemaFutures2.stream().map(future -> {
+ try {
+ return future.get();
+ } catch (Exception e) {
+ return null;
+ }
+ }).collect(Collectors.toList());
+ assertEquals(schemas2.size(), 1);
+ for (SchemaRegistry.SchemaAndMetadata schema : schemas2) {
+ assertNotNull(schema);
+ }
+
+ // not-force and not-delete-schema when delete topic
+ try {
+ admin.topics().delete(topic1, false, false);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+ }
+ assertEquals(this.getPulsar().getSchemaRegistryService()
+ .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
+ try {
+ admin.topics().deletePartitionedTopic(topic2, false, false);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+ }
+ assertEquals(this.getPulsar().getSchemaRegistryService()
+ .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1);
+
+ // force and delete-schema when delete topic
+ admin.topics().delete(topic1, true, true);
+ assertEquals(this.getPulsar().getSchemaRegistryService()
+ .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 0);
+ admin.topics().deletePartitionedTopic(topic2, true, true);
+ assertEquals(this.getPulsar().getSchemaRegistryService()
+ .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 0);
+ }
+
+ @Test
public void testProducerMultipleSchemaMessages() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);