You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/24 00:38:58 UTC

[pulsar] branch master updated: [Broker] Correct param of delete method for v1 topic (#12936)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6d9df8  [Broker] Correct param of delete method for v1 topic (#12936)
e6d9df8 is described below

commit e6d9df81c446870107dbb8d8e454b11b71cc9255
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Nov 24 08:37:18 2021 +0800

    [Broker] Correct param of delete method for v1 topic (#12936)
---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 10 ++-
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 90 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 2917482..09bb34b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -264,10 +264,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalDeletePartitionedTopic(asyncResponse, authoritative, force, false);
+            internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
@@ -302,9 +303,10 @@ public class PersistentTopics extends PersistentTopicsBase {
     public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteTopic(authoritative, force);
+        internalDeleteTopic(authoritative, force, deleteSchema);
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 0a4512d..5c83361 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -776,6 +776,96 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testDeleteTopicAndSchemaForV1() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String cluster = CLUSTER_NAME;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "not-partitioned-topic";
+        final String topic2 = "persistent://" + tenant + "/" + cluster + "/" + namespace + "/partitioned-topic";
+
+        // persistent, not-partitioned v1/topic
+        final String topic1 = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                cluster,
+                namespace,
+                topicOne).toString();
+
+        // persistent, partitioned v1/topic
+        admin.topics().createPartitionedTopic(topic2, 1);
+
+        @Cleanup
+        Producer<Schemas.PersonOne> p1_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topic1)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> p1_2 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic1)
+                .create();
+        @Cleanup
+        Producer<Schemas.PersonThree> p2_1 = pulsarClient.newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic2)
+                .create();
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures1 =
+                this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic1).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures1).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas1 = schemaFutures1.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+        assertEquals(schemas1.size(), 2);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas1) {
+            assertNotNull(schema);
+        }
+
+        List<CompletableFuture<SchemaRegistry.SchemaAndMetadata>> schemaFutures2 =
+                this.getPulsar().getSchemaRegistryService().getAllSchemas(TopicName.get(topic2).getSchemaName()).get();
+        FutureUtil.waitForAll(schemaFutures2).get();
+        List<SchemaRegistry.SchemaAndMetadata> schemas2 = schemaFutures2.stream().map(future -> {
+            try {
+                return future.get();
+            } catch (Exception e) {
+                return null;
+            }
+        }).collect(Collectors.toList());
+        assertEquals(schemas2.size(), 1);
+        for (SchemaRegistry.SchemaAndMetadata schema : schemas2) {
+            assertNotNull(schema);
+        }
+
+        // not-force and not-delete-schema when delete topic
+        try {
+            admin.topics().delete(topic1, false, false);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+        }
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
+        try {
+            admin.topics().deletePartitionedTopic(topic2, false, false);
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+        }
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 1);
+
+        // force and delete-schema when delete topic
+        admin.topics().delete(topic1, true, true);
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                .trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 0);
+        admin.topics().deletePartitionedTopic(topic2, true, true);
+        assertEquals(this.getPulsar().getSchemaRegistryService()
+                .trimDeletedSchemaAndGetList(TopicName.get(topic2).getSchemaName()).get().size(), 0);
+    }
+
+    @Test
     public void testProducerMultipleSchemaMessages() throws Exception {
         final String tenant = PUBLIC_TENANT;
         final String namespace = "test-namespace-" + randomName(16);