You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/30 14:06:59 UTC

[pulsar] branch master updated: Add compaction threshold for topic level (#7881)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff753c  Add compaction threshold for topic level (#7881)
3ff753c is described below

commit 3ff753c67ff3090c82f67691e60a82643931d21d
Author: hangc0276 <ha...@163.com>
AuthorDate: Sun Aug 30 22:06:43 2020 +0800

    Add compaction threshold for topic level (#7881)
    
    Fix #7826
    
    ### Motivation
    Support compaction threshold on topic level.
    Based on the system topic function.
    
    ### Modifications
    Support set compaction threshold on topic level.
    Support get compaction threshold on topic level.
    Support remove compaction threshold on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 ++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 87 ++++++++++++++++++++
 .../broker/service/persistent/PersistentTopic.java | 18 +++--
 .../broker/admin/TopicPoliciesDisableTest.java     | 20 +++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 38 +++++++++
 .../org/apache/pulsar/client/admin/Topics.java     | 94 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 77 +++++++++++++++++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 51 +++++++++++-
 .../pulsar/common/policies/data/TopicPolicies.java |  5 ++
 9 files changed, 422 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0123aa7..a78ead7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3100,4 +3100,46 @@ public class PersistentTopicsBase extends AdminResource {
 
     }
 
+    protected Optional<Long> internalGetCompactionThreshold() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
+    }
+
+    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
+        if (compactionThreshold != null && compactionThreshold < 0) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
+        }
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+            .orElseGet(TopicPolicies::new);
+        topicPolicies.setCompactionThreshold(compactionThreshold);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
+      validateAdminAccessForTenant(namespaceName.getTenant());
+      validatePoliciesReadOnlyAccess();
+      if (topicName.isGlobal()) {
+          validateGlobalNamespaceOwnership(namespaceName);
+      }
+      checkTopicLevelPolicyEnable();
+      Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+      if (!topicPolicies.isPresent()) {
+          return CompletableFuture.completedFuture(null);
+      }
+      topicPolicies.get().setCompactionThreshold(null);
+      return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c00f270..c98867a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1784,5 +1784,92 @@ public class PersistentTopics extends PersistentTopicsBase {
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+    @ApiOperation(value = "Get compaction threshold configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+                                       @PathParam("tenant") String tenant,
+                                       @PathParam("namespace") String namespace,
+                                       @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<Long> compactionThreshold = internalGetCompactionThreshold();
+            if (!compactionThreshold.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(compactionThreshold.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+    @ApiOperation(value = "Set compaction threshold configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String encodedTopic,
+                                @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic dispatch rate", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic dispatch rate");
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName(),
+                        jsonMapper().writeValueAsString(compactionThreshold));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+    @ApiOperation(value = "Remove compaction threshold configuration for specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic dispatch rate", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
+                    clientAppId(),
+                    tenant,
+                    namespace,
+                    topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3899313..7ea58e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1155,12 +1155,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public void checkCompaction() {
         TopicName name = TopicName.get(topic);
         try {
-            Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, name.getNamespace()))
-                .orElseThrow(() -> new KeeperException.NoNodeException());
+            Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name))
+                .map(TopicPolicies::getCompactionThreshold)
+                .orElse(null);
+            if (compactionThreshold == null) {
+                Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, name.getNamespace()))
+                    .orElseThrow(() -> new KeeperException.NoNodeException());
+                compactionThreshold = policies.compaction_threshold;
+            }
 
 
-            if (isSystemTopic() || policies.compaction_threshold != 0
+            if (isSystemTopic() || compactionThreshold != 0
                 && currentCompaction.isDone()) {
 
                 long backlogEstimate = 0;
@@ -1173,13 +1179,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     backlogEstimate = ledger.getEstimatedBacklogSize();
                 }
 
-                if (backlogEstimate > policies.compaction_threshold) {
+                if (backlogEstimate > compactionThreshold) {
                     try {
                         triggerCompaction();
                     } catch (AlreadyRunningException are) {
                         log.debug("[{}] Compaction already running, so don't trigger again, "
                                   + "even though backlog({}) is over threshold({})",
-                                  name, backlogEstimate, policies.compaction_threshold);
+                                  name, backlogEstimate, compactionThreshold);
                     }
                 }
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 329c3c7..fa073fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -150,4 +150,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(e.getStatusCode(), 405);
         }
     }
+
+    @Test
+    public void testCompactionThresholdDisabled() {
+        Long compactionThreshold = 10000L;
+        log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+        try {
+            admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getCompactionThreshold(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ef16adb..e216661 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -377,4 +377,42 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testGetSetCompactionThreshold() throws Exception {
+        long compactionThreshold = 100000;
+        log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+        admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+        log.info("Compaction threshold set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+        log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
+        Assert.assertEquals(getCompactionThreshold, compactionThreshold);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testRemoveCompactionThreshold() throws Exception {
+        Long compactionThreshold = 100000L;
+        log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+        admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+        log.info("Compaction threshold set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        Long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+        log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
+        Assert.assertEquals(getCompactionThreshold, compactionThreshold);
+
+        admin.topics().removeCompactionThreshold(testTopic);
+        Thread.sleep(3000);
+        log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
+        getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+        Assert.assertNull(getCompactionThreshold);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 93d3192..6ca2390 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1913,4 +1913,98 @@ public interface Topics {
      */
     CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
 
+    /**
+     * Get the compactionThreshold for a topic. The maximum number of bytes
+     * can have before compaction is triggered. 0 disables.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Long getCompactionThreshold(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the compactionThreshold for a topic asynchronously. The maximum number of bytes
+     * can have before compaction is triggered. 0 disables.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Long> getCompactionThresholdAsync(String topic);
+
+    /**
+     * Set the compactionThreshold for a topic. The maximum number of bytes
+     * can have before compaction is triggered. 0 disables.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @param compactionThreshold
+     *            maximum number of backlog bytes before compaction is triggered
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException;
+
+    /**
+     * Set the compactionThreshold for a topic asynchronously. The maximum number of bytes
+     * can have before compaction is triggered. 0 disables.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @param compactionThreshold
+     *            maximum number of backlog bytes before compaction is triggered
+     */
+    CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold);
+
+    /**
+     * Remove the compactionThreshold for a topic.
+     * @param topic
+     *            Topic name
+     * @throws PulsarAdminException
+     *            Unexpected error
+     */
+    void removeCompactionThreshold(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove the compactionThreshold for a topic asynchronously.
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> removeCompactionThresholdAsync(String topic);
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ffd9339..ce4ce78 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2003,6 +2003,81 @@ public class TopicsImpl extends BaseResource implements Topics {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public Long getCompactionThreshold(String topic) throws PulsarAdminException {
+        try {
+            return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "compactionThreshold");
+        final CompletableFuture<Long> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+            new InvocationCallback<Long>() {
+                @Override
+                public void completed(Long compactionThreshold) {
+                  future.complete(compactionThreshold);
+                }
+
+                @Override
+                public void failed(Throwable throwable) {
+                  future.completeExceptionally(getApiException(throwable.getCause()));
+                }
+            });
+        return future;
+    }
+
+    @Override
+    public void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException {
+        try {
+            setCompactionThresholdAsync(topic, compactionThreshold).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "compactionThreshold");
+        return asyncPostRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeCompactionThreshold(String topic) throws PulsarAdminException {
+        try {
+            removeCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeCompactionThresholdAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "compactionThreshold");
+        return asyncDeleteRequest(path);
+    }
 
-    private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
+  private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b84794e..5b19576 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -123,6 +123,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
         jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+        jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
+        jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
+        jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -259,17 +262,17 @@ public class CmdTopics extends CmdBase {
 
     @Parameters(commandDescription = "Create a non-partitioned topic.")
     private class CreateNonPartitionedCmd extends CliCommand {
-    	
+
     	@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
     	private java.util.List<String> params;
-    	
+
     	@Override
     	void run() throws Exception {
     		String topic = validateTopicName(params);
     		topics.createNonPartitionedTopic(topic);
     	}
     }
-    
+
     @Parameters(commandDescription = "Update existing non-global partitioned topic. \n"
             + "\t\tNew updating number of partitions must be greater than existing number of partitions.")
     private class UpdatePartitionedCmd extends CliCommand {
@@ -1162,4 +1165,46 @@ public class CmdTopics extends CmdBase {
             admin.topics().removeDispatchRate(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get compaction threshold for a topic")
+    private class GetCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getCompactionThreshold(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set compaction threshold for a topic")
+    private class SetCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--threshold", "-t" },
+            description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+                + "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
+            required = true)
+        private String threshold = "0";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setCompactionThreshold(persistentTopic, validateSizeString(threshold));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove compaction threshold for a topic")
+    private class RemoveCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeCompactionThreshold(persistentTopic);
+        }
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 0e459c6..d56e283 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -50,6 +50,7 @@ public class TopicPolicies {
     private Long delayedDeliveryTickTimeMillis = null;
     private Boolean delayedDeliveryEnabled = null;
     private DispatchRate dispatchRate = null;
+    private Long compactionThreshold = null;
 
     public boolean isMaxUnackedMessagesOnConsumerSet() {
         return maxUnackedMessagesOnConsumer != null;
@@ -102,4 +103,8 @@ public class TopicPolicies {
     public boolean isDispatchRateSet() {
         return dispatchRate != null;
     }
+
+    public boolean isCompactionThresholdSet() {
+        return compactionThreshold != null;
+    }
 }