You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 21:53:29 UTC

[incubator-pulsar] branch master updated: Add admin api to delete topic forcefully (#1656)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40f3933  Add admin api to delete topic forcefully (#1656)
40f3933 is described below

commit 40f39333ccaf2497b4dbb3ed11665aaca7c5bf69
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue May 1 14:53:26 2018 -0700

    Add admin api to delete topic forcefully (#1656)
    
    * Add admin api to delete topic forcefully
    
    * Added option on CmdTopics as well
    
    * Fixed mocked test
    
    * Fixed other mock verification
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 24 +++++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  9 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 10 ++-
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +
 .../service/nonpersistent/NonPersistentTopic.java  | 90 +++++++++++++++-------
 .../broker/service/persistent/PersistentTopic.java |  3 +-
 .../broker/service/ReplicatorGlobalNSTest.java     | 33 ++++++--
 .../pulsar/client/admin/PersistentTopics.java      |  7 +-
 .../org/apache/pulsar/client/admin/Topics.java     | 57 ++++++++++++--
 .../pulsar/client/admin/internal/TopicsImpl.java   | 22 +++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  8 +-
 .../pulsar/admin/cli/CmdPersistentTopics.java      | 10 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 12 ++-
 13 files changed, 221 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 21b14d0..ff50ec1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -285,6 +286,17 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalDeleteTopicForcefully(boolean authoritative) {
+        validateAdminOperationOnTopic(true);
+        Topic topic = getTopicReference(topicName);
+        try {
+            topic.deleteForcefully().get();
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete topic forcefully {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalRevokePermissionsOnTopic(String role) {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -394,7 +406,7 @@ public class PersistentTopicsBase extends AdminResource {
         return metadata;
     }
 
-    protected void internalDeletePartitionedTopic(boolean authoritative) {
+    protected void internalDeletePartitionedTopic(boolean authoritative, boolean force) {
         validateAdminAccessForTenant(topicName.getTenant());
         PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
         int numPartitions = partitionMetadata.partitions;
@@ -404,7 +416,7 @@ public class PersistentTopicsBase extends AdminResource {
             try {
                 for (int i = 0; i < numPartitions; i++) {
                     TopicName topicNamePartition = topicName.getPartition(i);
-                    pulsar().getAdminClient().topics().deleteAsync(topicNamePartition.toString())
+                    pulsar().getAdminClient().persistentTopics().deleteAsync(topicNamePartition.toString(), force)
                             .whenComplete((r, ex) -> {
                                 if (ex != null) {
                                     if (ex instanceof NotFoundException) {
@@ -465,6 +477,14 @@ public class PersistentTopicsBase extends AdminResource {
         unloadTopic(topicName, authoritative);
     }
 
+    protected void internalDeleteTopic(boolean authoritative, boolean force) {
+        if (force) {
+            internalDeleteTopicForcefully(authoritative);
+        } else {
+            internalDeleteTopic(authoritative);
+        }
+    }
+    
     protected void internalDeleteTopic(boolean authoritative) {
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 470ee45..33f5bf9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -181,9 +181,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Partitioned topic does not exist") })
     public void deletePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeletePartitionedTopic(authoritative);
+        internalDeletePartitionedTopic(authoritative, force);
     }
 
     @PUT
@@ -200,15 +201,17 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/{topic}")
-    @ApiOperation(hidden = true, value = "Delete a topic.", notes = "The topic cannot be deleted if there's any active subscription or producer connected to the it.")
+    @ApiOperation(hidden = true, value = "Delete a topic.", notes = "The topic cannot be deleted if delete is not forcefully and there's any active "
+            + "subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Topic has active producers/subscriptions") })
     public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalDeleteTopic(authoritative);
+        internalDeleteTopic(authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e1a98b..2256d88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -176,9 +176,10 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 404, message = "Partitioned topic does not exist") })
     public void deletePartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeletePartitionedTopic(authoritative);
+        internalDeletePartitionedTopic(authoritative, force);
     }
 
     @PUT
@@ -195,15 +196,16 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}")
-    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if there's any active subscription or producer connected to the it.")
+    @ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if delete is not forcefully and there's any active "
+            + "subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 412, message = "Topic has active producers/subscriptions") })
     public void deleteTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("topic") @Encoded String encodedTopic,
+            @PathParam("topic") @Encoded String encodedTopic, @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        internalDeleteTopic(authoritative);
+        internalDeleteTopic(authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fccc75b..7488204 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -134,4 +134,6 @@ public interface Topic {
     CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 
     CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema);
+
+    CompletableFuture<Void> deleteForcefully();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index cddfea3..de55df3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -410,10 +410,20 @@ public class NonPersistentTopic implements Topic {
 
     @Override
     public CompletableFuture<Void> delete() {
-        return delete(false);
+        return delete(false, false);
     }
 
-    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions) {
+    /**
+     * Forcefully close all producers/consumers/replicators and deletes the topic.
+     * 
+     * @return
+     */
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
+        return delete(false, true);
+    }
+    
+    private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
         CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
 
         lock.writeLock().lock();
@@ -423,36 +433,62 @@ public class NonPersistentTopic implements Topic {
                 deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
                 return deleteFuture;
             }
-            if (USAGE_COUNT_UPDATER.get(this) == 0) {
-                isFenced = true;
 
+            CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
+            if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
+                producers.forEach(producer -> futures.add(producer.disconnect()));
+                subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+                FutureUtil.waitForAll(futures).thenRun(() -> {
+                    closeClientFuture.complete(null);
+                }).exceptionally(ex -> {
+                    log.error("[{}] Error closing clients", topic, ex);
+                    isFenced = false;
+                    closeClientFuture.completeExceptionally(ex);
+                    return null;
+                });
+            } else {
+                closeClientFuture.complete(null);
+            }
 
-                if (failIfHasSubscriptions) {
-                    if (!subscriptions.isEmpty()) {
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
-                        return deleteFuture;
-                    }
-                } else {
-                    subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
-                }
+            closeClientFuture.thenAccept(delete -> {
+
+                if (USAGE_COUNT_UPDATER.get(this) == 0) {
+                    isFenced = true;
+
+                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Error deleting topic", topic, ex);
-                        isFenced = false;
-                        deleteFuture.completeExceptionally(ex);
+                    if (failIfHasSubscriptions) {
+                        if (!subscriptions.isEmpty()) {
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
+                            return;
+                        }
                     } else {
-                        brokerService.removeTopicFromCache(topic);
-                        log.info("[{}] Topic deleted", topic);
-                        deleteFuture.complete(null);
+                        subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
                     }
-                });
-            } else {
-                deleteFuture.completeExceptionally(new TopicBusyException(
-                        "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
-            }
+
+                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+                        if (ex != null) {
+                            log.error("[{}] Error deleting topic", topic, ex);
+                            isFenced = false;
+                            deleteFuture.completeExceptionally(ex);
+                        } else {
+                            brokerService.removeTopicFromCache(topic);
+                            log.info("[{}] Topic deleted", topic);
+                            deleteFuture.complete(null);
+                        }
+                    });
+                } else {
+                    deleteFuture.completeExceptionally(new TopicBusyException(
+                            "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
+                }
+            }).exceptionally(ex -> {
+                deleteFuture.completeExceptionally(
+                        new TopicBusyException("Failed to close clients before deleting topic."));
+                return null;
+            });
         } finally {
             lock.writeLock().unlock();
         }
@@ -890,7 +926,7 @@ public class NonPersistentTopic implements Topic {
                                 gcIntervalInSeconds);
                     }
 
-                    stopReplProducers().thenCompose(v -> delete(true))
+                    stopReplProducers().thenCompose(v -> delete(true, false))
                             .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
                             .exceptionally(e -> {
                                 if (e.getCause() instanceof TopicBusyException) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d56bf55..574fab8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -673,7 +673,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
      * 
      * @return
      */
-    private CompletableFuture<Void> deleteForcefully() {
+    @Override
+    public CompletableFuture<Void> deleteForcefully() {
         return delete(false, true);
     }
     
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index adf5ce7..4fb54dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -58,11 +59,6 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
         super.shutdown();
     }
 
-    @DataProvider(name = "partitionedTopic")
-    public Object[][] partitionedTopicProvider() {
-        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
-    }
-
     /**
      * If local cluster is removed from the global namespace then all topics under that namespace should be deleted from
      * the cluster.
@@ -105,6 +101,33 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
         client2.close();
     }
 
+    @Test
+    public void testForcefullyTopicDeletion() throws Exception {
+        log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
+
+        final String namespace = "pulsar/removeClusterTest";
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+
+        final String topicName = "persistent://" + namespace + "/topic";
+
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
+                .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        producer1.close();
+
+        admin1.persistentTopics().delete(topicName, true);
+
+        MockedPulsarServiceBaseTest
+                .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 5, 150);
+
+        Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+
+        client1.close();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index b65ffd0..6fee2ff 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -18,10 +18,5 @@
  */
 package org.apache.pulsar.client.admin;
 
-/**
- * @deprecated since 2.0. See {@link Topics}
- */
 @Deprecated
-public interface PersistentTopics extends Topics {
-
-}
+public interface PersistentTopics extends Topics {}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2d0d5e5..2d629c3 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -283,6 +283,22 @@ public interface Topics {
      *
      * @param topic
      *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *            
+     * @throws PulsarAdminException
+     */
+    void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException;
+    
+    /**
+     * Delete a partitioned topic.
+     * <p>
+     * It will also delete all the partitions of the topic if it exists.
+     * <p>
+     *
+     * @param topic
+     *            Topic name
+     *            
      * @throws PulsarAdminException
      */
     void deletePartitionedTopic(String topic) throws PulsarAdminException;
@@ -295,10 +311,36 @@ public interface Topics {
      *
      * @param topic
      *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *            
      * @return a future that can be used to track when the partitioned topic is deleted
      */
-    CompletableFuture<Void> deletePartitionedTopicAsync(String topic);
+    CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force);
+    
+    /**
+     * Delete a topic.
+     * <p>
+     * Delete a topic. The topic cannot be deleted if force flag is disable and there's any active subscription or producer connected to the it. Force flag deletes topic forcefully by closing all active producers and consumers.
+     * <p>
+     *
+     * @param topic
+     *            Topic name
+     * @param force
+     *            Delete topic forcefully
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws PreconditionFailedException
+     *             Topic has active subscriptions or producers
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void delete(String topic, boolean force) throws PulsarAdminException;
 
+    
     /**
      * Delete a topic.
      * <p>
@@ -318,20 +360,23 @@ public interface Topics {
      *             Unexpected error
      */
     void delete(String topic) throws PulsarAdminException;
-
+    
     /**
      * Delete a topic asynchronously.
      * <p>
-     * Delete a topic asynchronously. The topic cannot be deleted if there's any active subscription or producer
-     * connected to the it.
+     * Delete a topic asynchronously. The topic cannot be deleted if force flag is disable and there's any active
+     * subscription or producer connected to the it. Force flag deletes topic forcefully by closing all active producers
+     * and consumers.
      * <p>
      *
      * @param topic
      *            topic name
-     *
+     * @param force
+     *            Delete topic forcefully
+     * 
      * @return a future that can be used to track when the topic is deleted
      */
-    CompletableFuture<Void> deleteAsync(String topic);
+    CompletableFuture<Void> deleteAsync(String topic, boolean force);
 
     /**
      * Unload a topic.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0ccb16e..7a6b21c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -75,6 +75,8 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicStats;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -270,8 +272,13 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
 
     @Override
     public void deletePartitionedTopic(String topic) throws PulsarAdminException {
+        deletePartitionedTopic(topic, false);
+    }
+
+    @Override
+    public void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException {
         try {
-            deletePartitionedTopicAsync(topic).get();
+            deletePartitionedTopicAsync(topic, force).get();
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -281,16 +288,22 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
     }
 
     @Override
-    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
+    public CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitions");
+        path = path.queryParam("force", force);
         return asyncDeleteRequest(path);
     }
 
     @Override
     public void delete(String topic) throws PulsarAdminException {
+        delete(topic, false);
+    }
+
+    @Override
+    public void delete(String topic, boolean force) throws PulsarAdminException {
         try {
-            deleteAsync(topic).get();
+            deleteAsync(topic, force).get();
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -300,9 +313,10 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics
     }
 
     @Override
-    public CompletableFuture<Void> deleteAsync(String topic) {
+    public CompletableFuture<Void> deleteAsync(String topic, boolean force) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn);
+        path = path.queryParam("force", Boolean.toString(force));
         return asyncDeleteRequest(path);
     }
 
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9f10be3..a5b1581 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -505,7 +505,7 @@ public class PulsarAdminToolTest {
         CmdTopics cmdTopics = new CmdTopics(admin);
 
         cmdTopics.run(split("delete persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
 
         cmdTopics.run(split("unload persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
@@ -556,7 +556,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
 
         cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
         verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
@@ -588,7 +588,7 @@ public class PulsarAdminToolTest {
         CmdPersistentTopics topics = new CmdPersistentTopics(admin);
 
         topics.run(split("delete persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).delete("persistent://myprop/clust/ns1/ds1", false);
 
         topics.run(split("unload persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).unload("persistent://myprop/clust/ns1/ds1");
@@ -639,7 +639,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
 
         topics.run(split("delete-partitioned-topic persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).deletePartitionedTopic("persistent://myprop/clust/ns1/ds1", false);
 
         topics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
         verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 5b65256..266ebc4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -239,11 +239,14 @@ public class CmdPersistentTopics extends CmdBase {
 
         @Parameter(description = "persistent://property/cluster/namespace/topic\n", required = true)
         private java.util.List<String> params;
+        
+        @Parameter(names = "--force", description = "Close all producer/consumer/replicator and delete topic forcefully")
+        private boolean force = false;
 
         @Override
         void run() throws Exception {
             String persistentTopic = validatePersistentTopic(params);
-            persistentTopics.deletePartitionedTopic(persistentTopic);
+            persistentTopics.deletePartitionedTopic(persistentTopic, force);
         }
     }
 
@@ -253,10 +256,13 @@ public class CmdPersistentTopics extends CmdBase {
         @Parameter(description = "persistent://property/cluster/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = "--force", description = "Close all producer/consumer/replicator and delete topic forcefully")
+        private boolean force = false;
+        
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            persistentTopics.delete(persistentTopic);
+            persistentTopics.delete(persistentTopic, force);
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 04d9690..2d486aa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -246,10 +246,14 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://property/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully")
+        private boolean force = false;
+
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            topics.deletePartitionedTopic(topic);
+            topics.deletePartitionedTopic(topic, force);
         }
     }
 
@@ -259,10 +263,14 @@ public class CmdTopics extends CmdBase {
         @Parameter(description = "persistent://property/namespace/topic\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Close all producer/consumer/replicator and delete topic forcefully")
+        private boolean force = false;
+
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
-            topics.delete(topic);
+            topics.delete(topic, force);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.