You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/28 06:26:22 UTC

[pulsar] branch master updated: Support configuring DeleteInactiveTopic setting in namespace policy (#7598)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00e3089  Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
00e3089 is described below

commit 00e30895b22129d5189db1592851bdb62e8c498b
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:25:58 2020 +0800

    Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
    
    ### Motivation
    
    Support configuring DeleteInactiveTopic setting in namespace policy
    
    ### Modifications
    
    Only the two parameters `brokerDeleteInactiveTopicsMode` and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` support namespace policy. The parameters are changed to Map structure, the key is the namespace, and the value is the parameter value.
    Such as: namespace1=delete_when_no_subscriptions, namespace2=delete_when_no_subscriptions.
    
    In addition, there is a key name called `default`. If it is set, other namespaces that do not specify parameters will use this parameter.
    Such as: default=delete_when_no_subscriptions
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  48 ++++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  36 +++++
 .../pulsar/broker/service/AbstractTopic.java       |  28 +++-
 .../pulsar/broker/service/BrokerService.java       |   8 +-
 .../org/apache/pulsar/broker/service/Topic.java    |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  23 ++-
 .../broker/service/persistent/PersistentTopic.java |  21 ++-
 .../broker/service/persistent/SystemTopic.java     |   2 +-
 .../pulsar/broker/service/BrokerTestBase.java      |   7 +-
 .../broker/service/InactiveTopicDeleteTest.java    | 165 +++++++++++++++++++++
 .../service/PersistentTopicConcurrentTest.java     |   4 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  48 ++++++
 .../client/admin/internal/NamespacesImpl.java      |  81 ++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  14 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  73 ++++++++-
 ...{PolicyName.java => InactiveTopicPolicies.java} |  34 ++---
 .../pulsar/common/policies/data/Policies.java      |   6 +-
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 18 files changed, 543 insertions(+), 58 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ec2c1bd..c76eeda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -26,6 +26,8 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
+
+import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -90,6 +92,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
@@ -1832,39 +1835,64 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
-    protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+    protected InactiveTopicPolicies internalGetInactiveTopic() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        if (policies.inactive_topic_policies == null) {
+            return new InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
+                    , config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
+                    , config().isBrokerDeleteInactiveTopicsEnabled());
+        } else {
+            return policies.inactive_topic_policies;
+        }
+    }
+
+    protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
         validateSuperUserAccess();
         validatePoliciesReadOnlyAccess();
+        internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
+    }
 
+    protected void internalSetPolicies(String fieldName, Object value){
         try {
             Stat nodeStat = new Stat();
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
             Policies policies = jsonMapper().readValue(content, Policies.class);
 
-            policies.delayed_delivery_policies = delayedDeliveryPolicies;
+            Field field = Policies.class.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(policies, value);
+
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
-            log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(),
-                    namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
+            log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
+                    namespaceName, jsonMapper().writeValueAsString(value));
 
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(),
-                    namespaceName);
+            log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist", clientAppId(),
+                    fieldName, namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
-            log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification",
-                    clientAppId(), namespaceName);
+            log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
+                    clientAppId(), fieldName, namespaceName);
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (RestException pfe) {
             throw pfe;
         } catch (Exception e) {
-            log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName,
-                    e);
+            log.error("[{}] Failed to update {} configuration for namespace {}", clientAppId(), fieldName
+                    , namespaceName, e);
             throw new RestException(e);
         }
     }
 
+    protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+        internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies);
+    }
+
     protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
         checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0a9ec96..f162207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.slf4j.Logger;
@@ -858,6 +859,41 @@ public class Namespaces extends NamespacesBase {
     }
 
     @GET
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Get inactive topic policies config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"), })
+    public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
+                                                              @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetInactiveTopic();
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Remove inactive topic policies from a namespace.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeInactiveTopicPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetInactiveTopic( null);
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+    @ApiOperation(value = "Set inactive topic policies config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
+    public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Inactive topic policies for the specified namespace") InactiveTopicPolicies inactiveTopicPolicies) {
+        validateNamespaceName(tenant, namespace);
+        internalSetInactiveTopic(inactiveTopicPolicies);
+    }
+
+    @GET
     @Path("/{tenant}/{namespace}/maxProducersPerTopic")
     @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4a02f1f..09a0521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -63,8 +65,8 @@ public abstract class AbstractTopic implements Topic {
 
     protected volatile boolean isFenced;
 
-    // When set to false, this inactive topic can not be deleted
-    protected boolean deleteWhileInactive;
+    // Inactive topic policies
+    protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
 
     // Timestamp of when this topic was last seen active
     protected volatile long lastActive;
@@ -98,8 +100,9 @@ public abstract class AbstractTopic implements Topic {
         this.producers = new ConcurrentHashMap<>();
         this.isFenced = false;
         this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
-        this.deleteWhileInactive =
-                brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+        this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
+        this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
+        this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
         this.lastActive = System.nanoTime();
         Policies policies = null;
         try {
@@ -132,12 +135,14 @@ public abstract class AbstractTopic implements Topic {
         return false;
     }
 
+    @Override
     public void disableCnxAutoRead() {
         if (producers != null) {
             producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
         }
     }
 
+    @Override
     public void enableCnxAutoRead() {
         if (producers != null) {
             producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
@@ -466,12 +471,23 @@ public abstract class AbstractTopic implements Topic {
     }
 
     public boolean isDeleteWhileInactive() {
-        return deleteWhileInactive;
+        return this.inactiveTopicPolicies.isDeleteWhileInactive();
     }
 
     public void setDeleteWhileInactive(boolean deleteWhileInactive) {
-        this.deleteWhileInactive = deleteWhileInactive;
+        this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
     }
 
     private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
+
+    public InactiveTopicPolicies getInactiveTopicPolicies() {
+        return inactiveTopicPolicies;
+    }
+
+    public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
+            , int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
+        inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
+        inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 34cca90..e6b1c81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -442,8 +442,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     protected void startInactivityMonitor() {
         if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
             int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
-            int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
-            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
+            inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), interval, interval,
                     TimeUnit.SECONDS);
         }
 
@@ -1244,9 +1243,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         return lookupRequestSemaphore.get();
     }
 
-    public void checkGC(int maxInactiveDurationInSec) {
-        forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
-            pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
+    public void checkGC() {
+        forEachTopic(Topic::checkGC);
     }
 
     public void checkMessageExpiry() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d20e700..4d14326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -124,7 +124,7 @@ public interface Topic {
 
     CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
 
-    void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);
+    void checkGC();
 
     void checkInactiveSubscriptions();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index eb8d9d3..5a4d7cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -68,7 +69,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -145,6 +145,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                     .orElseThrow(() -> new KeeperException.NoNodeException());
             isEncryptionRequired = policies.encryption_required;
             isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
+            if (policies.inactive_topic_policies != null) {
+                inactiveTopicPolicies = policies.inactive_topic_policies;
+            }
             setSchemaCompatibilityStrategy(policies);
 
             schemaValidationEnforced = policies.schema_validation_enforced;
@@ -420,7 +423,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     /**
      * Close this topic - close all producers and subscriptions associated with this topic
-     * 
+     *
      * @param closeWithoutWaitingClientDisconnect
      *            don't wait for client disconnect and forcefully close managed-ledger
      * @return Completable future indicating completion of close operation
@@ -626,6 +629,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         return replicators.get(remoteCluster);
     }
 
+    @Override
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
             StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
             boolean hydratePublishers) {
@@ -755,6 +759,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         topicStatsStream.endObject();
     }
 
+    @Override
     public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
 
         NonPersistentTopicStats stats = new NonPersistentTopicStats();
@@ -808,6 +813,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         return stats;
     }
 
+    @Override
     public PersistentTopicInternalStats getInternalStats() {
 
         PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
@@ -829,11 +835,12 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC() {
+        if (!isDeleteWhileInactive()) {
             // This topic is not included in GC
             return;
         }
+        int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
         if (isActive()) {
             lastActive = System.nanoTime();
         } else {
@@ -895,6 +902,14 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
             producer.checkEncryption();
         });
         subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+
+        if (data.inactive_topic_policies != null) {
+            this.inactiveTopicPolicies = data.inactive_topic_policies;
+        } else {
+            ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                    , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+        }
         return checkReplicationAndRetryOnFailure();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8983238..377dc98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -255,6 +256,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
 
             schemaValidationEnforced = policies.schema_validation_enforced;
+            if (policies.inactive_topic_policies != null) {
+                inactiveTopicPolicies = policies.inactive_topic_policies;
+            }
 
             maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(policies);
             maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(policies);
@@ -1273,6 +1277,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return ledger;
     }
 
+    @Override
     public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
             ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
 
@@ -1488,6 +1493,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return lastUpdatedAvgPublishRateInByte;
     }
 
+    @Override
     public TopicStats getStats(boolean getPreciseBacklog) {
 
         TopicStats stats = new TopicStats();
@@ -1546,6 +1552,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return stats;
     }
 
+    @Override
     public PersistentTopicInternalStats getInternalStats() {
         PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
 
@@ -1628,11 +1635,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC() {
+        if (!isDeleteWhileInactive()) {
             // This topic is not included in GC
             return;
         }
+        InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.getInactiveTopicDeleteMode();
+        int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
         if (isActive(deleteMode)) {
             lastActive = System.nanoTime();
         } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
@@ -1787,6 +1796,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
             delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
         }
+        if (data.inactive_topic_policies != null) {
+            this.inactiveTopicPolicies = data.inactive_topic_policies;
+        } else {
+            ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                    , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+        }
 
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
 
@@ -1977,6 +1993,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
     }
 
+    @Override
     public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
         return this.dispatchRateLimiter;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 6720209..4b338a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -46,7 +46,7 @@ public class SystemTopic extends PersistentTopic {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
+    public void checkGC() {
         // do nothing for system topic
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 314ccfa..ada2242 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -55,7 +55,12 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
 
     void runGC() {
         try {
-            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC(0)).get();
+            pulsar.getBrokerService().forEachTopic(topic -> {
+                if (topic instanceof AbstractTopic) {
+                    ((AbstractTopic) topic).getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+                }
+            });
+            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC()).get();
             Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
         } catch (Exception e) {
             LOG.error("GC executor error", e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index c3d353b..143a4ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -19,10 +19,18 @@
 package org.apache.pulsar.broker.service;
 
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Consumer;
 
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -68,6 +76,163 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 20000)
+    public void testTopicPolicyUpdateAndClean() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String namespace2 = "prop/ns-abc2";
+        final String namespace3 = "prop/ns-abc3";
+        List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+        super.resetConfig();
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+        conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+                , 1000, true);
+
+        super.baseSetup();
+
+        for (String ns : namespaceList) {
+            admin.namespaces().createNamespace(ns);
+            admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+        }
+
+        final String topic = "persistent://prop/ns-abc/testDeletePolicyUpdate";
+        final String topic2 = "persistent://prop/ns-abc2/testDeletePolicyUpdate";
+        final String topic3 = "persistent://prop/ns-abc3/testDeletePolicyUpdate";
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+        for (String tp : topics) {
+            admin.topics().createNonPartitionedTopic(tp);
+        }
+
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+        admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        admin.namespaces().setInactiveTopicPolicies(namespace3, inactiveTopicPolicies);
+
+        InactiveTopicPolicies policies;
+        //wait for zk
+        while (true) {
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.isDeleteWhileInactive()) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
+        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));
+
+        admin.namespaces().removeInactiveTopicPolicies(namespace);
+        while (true) {
+            Thread.sleep(500);
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.getMaxInactiveDurationSeconds() == 1000) {
+                break;
+            }
+        }
+        Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
+                , defaultPolicy);
+
+        policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+        Assert.assertTrue(policies.isDeleteWhileInactive());
+        Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+        Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));
+
+        admin.namespaces().removeInactiveTopicPolicies(namespace2);
+        while (true) {
+            Thread.sleep(500);
+            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+            if (policies.getMaxInactiveDurationSeconds() == 1000) {
+                break;
+            }
+        }
+        Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
+                , defaultPolicy);
+
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 20000)
+    public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String namespace2 = "prop/ns-abc2";
+        final String namespace3 = "prop/ns-abc3";
+        List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+        conf.setBrokerDeleteInactiveTopicsEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+        super.baseSetup();
+
+        for (String ns : namespaceList) {
+            admin.namespaces().createNamespace(ns);
+            admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+        }
+
+        final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        final String topic2 = "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        final String topic3 = "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig";
+        List<String> topics = Arrays.asList(topic, topic2, topic3);
+        //create producer/consumer and close
+        Map<String, String> topicToSub = new HashMap<>();
+        for (String tp : topics) {
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(tp).create();
+            String subName = "sub" + System.currentTimeMillis();
+            topicToSub.put(tp, subName);
+            Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+            for (int i = 0; i < 10; i++) {
+                producer.send("Pulsar".getBytes());
+            }
+            consumer.close();
+            producer.close();
+            Thread.sleep(1);
+        }
+        // namespace use delete_when_no_subscriptions, namespace2 use delete_when_subscriptions_caught_up
+        // namespace3 use default:delete_when_no_subscriptions
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+        admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+        admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+
+        //wait for zk
+        while (true) {
+            InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
+                    .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            if (policies.isDeleteWhileInactive()) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+
+        // topic should still exist
+        Thread.sleep(2000);
+        Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+        Assert.assertTrue(admin.topics().getList(namespace2).contains(topic2));
+        Assert.assertTrue(admin.topics().getList(namespace3).contains(topic3));
+
+        // no backlog, trigger delete_when_subscriptions_caught_up
+        admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+        // delete subscription, trigger delete_when_no_subscriptions
+        for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+            admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
+        }
+        Thread.sleep(2000);
+        Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+        Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
+
+        super.internalCleanup();
+    }
+
     @Test
     public void testDeleteWhenNoBacklogs() throws Exception {
         conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index cfc3937..12773b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -196,7 +196,9 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
                     // Thread.sleep(5,0);
                     log.info("{} forcing topic GC ", Thread.currentThread());
                     for (int i = 0; i < 2000; i++) {
-                        topic.checkGC(0, InactiveTopicDeleteMode.delete_when_no_subscriptions);
+                        topic.getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+                        topic.getInactiveTopicPolicies().setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+                        topic.checkGC();
                     }
                     log.info("GC done..");
                 } catch (Exception e) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 7cb7349..f4315e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -2207,6 +2208,53 @@ public interface Namespaces {
             String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
 
     /**
+     * Get the inactive deletion strategy for all topics within a namespace synchronously.
+     * @param namespace
+     * @return
+     * @throws PulsarAdminException
+     */
+    InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+    /**
+     * remove InactiveTopicPolicies from a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace);
+
+    /**
+     * Remove inactive topic policies from a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the inactive deletion strategy for all topics within a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace);
+
+    /**
+     * As same as setInactiveTopicPoliciesAsync,but it is synchronous.
+     * @param namespace
+     * @param inactiveTopicPolicies
+     */
+    void setInactiveTopicPolicies(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException;
+
+    /**
+     * You can set the inactive deletion strategy at the namespace level.
+     * Its priority is higher than the inactive deletion strategy at the broker level.
+     * All topics under this namespace will follow this strategy.
+     * @param namespace
+     * @param inactiveTopicPolicies
+     * @return
+     */
+    CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies);
+    /**
      * Set the given subscription auth mode on all topics on a namespace.
      *
      * @param namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 1a9f010..83c6ce4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -956,6 +957,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+        try {
+            removeInactiveTopicPoliciesAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, "backlogQuota")
@@ -1775,6 +1798,64 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+        try {
+            return getInactiveTopicPoliciesAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
+        asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+                    @Override
+                    public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
+                        future.complete(inactiveTopicPolicies);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setInactiveTopicPolicies(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException {
+        try {
+            setInactiveTopicPoliciesAsync(namespace, inactiveTopicPolicies)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+            String namespace, InactiveTopicPolicies inactiveTopicPolicies) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+        return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
         try {
             return getMaxProducersPerTopicAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d5ca27e..18f0d39 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -76,6 +76,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -406,6 +408,16 @@ public class PulsarAdminToolTest {
         namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
         verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
 
+        namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
+        verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
+                , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
+
+        namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
+        verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
+
+        namespaces.run(split("remove-inactive-topic-policies myprop/clust/ns1"));
+        verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1");
+
         namespaces.run(split("clear-backlog myprop/clust/ns1 -force"));
         verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1");
 
@@ -484,7 +496,7 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("get-dispatch-rate myprop/clust/ns1"));
         verify(mockNamespaces).getDispatchRate("myprop/clust/ns1");
-        
+
         namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20"));
         verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new PublishRate(10, 20));
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 78d034f..661a56a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -658,7 +660,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(names = { "--relative-to-publish-rate",
                 "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
         private boolean relativeToPublishRate = false;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
@@ -1014,6 +1016,67 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getInactiveTopicPolicies(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove inactive topic policies from a namespace")
+    private class RemoveInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().removeInactiveTopicPolicies(namespace);
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a namespace")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
+        private boolean enableDeleteWhileInactive = false;
+
+        @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
+        private boolean disableDeleteWhileInactive = false;
+
+        @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+                ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String deleteInactiveTopicsMaxInactiveDuration;
+
+        @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
+                ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
+        private String inactiveTopicDeleteMode;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+            if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+                throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
+            }
+            InactiveTopicDeleteMode deleteMode = null;
+            try {
+                deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+            } catch (IllegalArgumentException e) {
+                throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+            }
+            admin.namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+        }
+    }
+
     @Parameters(commandDescription = "Set the delayed delivery policy on a namespace")
     private class SetDelayedDelivery extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -1662,7 +1725,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
-        
+
         jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
         jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
         jcommander.addCommand("delete-bookie-affinity-group", new DeleteBookieAffinityGroup());
@@ -1679,7 +1742,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
         jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
-        
+
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("get-publish-rate", new GetPublishRate());
 
@@ -1696,6 +1759,10 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
         jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
 
+        jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
+        jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
+        jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+
         jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
         jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
         jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
similarity index 68%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
index 439ed7b..ac4607b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
@@ -16,30 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.common.policies.data;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 /**
- * PolicyName authorization operations.
+ * Definition of the inactive topic policy.
  */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    MAX_UNACKED,
-    OFFLOAD,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    ENCRYPTION,
-    TTL,
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InactiveTopicPolicies {
+    private InactiveTopicDeleteMode inactiveTopicDeleteMode;
+    private int maxInactiveDurationSeconds;
+    private boolean deleteWhileInactive;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 2946157..0fe0811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -74,6 +74,8 @@ public class Policies {
     @SuppressWarnings("checkstyle:MemberName")
     public DelayedDeliveryPolicies delayed_delivery_policies = null;
     @SuppressWarnings("checkstyle:MemberName")
+    public InactiveTopicPolicies inactive_topic_policies = null;
+    @SuppressWarnings("checkstyle:MemberName")
     public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
     @SuppressWarnings("checkstyle:MemberName")
@@ -120,7 +122,7 @@ public class Policies {
                 autoSubscriptionCreationOverride, persistence,
                 bundles, latency_stats_sample_rate,
                 message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies,
-                encryption_required, delayed_delivery_policies,
+                encryption_required, delayed_delivery_policies, inactive_topic_policies,
                 subscription_auth_mode,
                 antiAffinityGroup, max_producers_per_topic,
                 max_consumers_per_topic, max_consumers_per_subscription,
@@ -158,6 +160,7 @@ public class Policies {
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
                     && Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
+                    && Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
                     && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
                     && Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
                     && max_producers_per_topic == other.max_producers_per_topic
@@ -218,6 +221,7 @@ public class Policies {
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
                 .add("delayed_delivery_policies", delayed_delivery_policies)
+                .add("inactive_topic_policies", inactive_topic_policies)
                 .add("subscription_auth_mode", subscription_auth_mode)
                 .add("max_producers_per_topic", max_producers_per_topic)
                 .add("max_consumers_per_topic", max_consumers_per_topic)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 439ed7b..8439a1f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -28,6 +28,7 @@ public enum PolicyName {
     BACKLOG,
     COMPACTION,
     DELAYED_DELIVERY,
+    INACTIVE_TOPIC,
     DEDUPLICATION,
     MAX_CONSUMERS,
     MAX_PRODUCERS,