You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/30 14:06:59 UTC
[pulsar] branch master updated: Add compaction threshold for topic
level (#7881)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff753c Add compaction threshold for topic level (#7881)
3ff753c is described below
commit 3ff753c67ff3090c82f67691e60a82643931d21d
Author: hangc0276 <ha...@163.com>
AuthorDate: Sun Aug 30 22:06:43 2020 +0800
Add compaction threshold for topic level (#7881)
Fix #7826
### Motivation
Support compaction threshold on topic level.
Based on the system topic function.
### Modifications
Support set compaction threshold on topic level.
Support get compaction threshold on topic level.
Support remove compaction threshold on topic level.
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 ++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 87 ++++++++++++++++++++
.../broker/service/persistent/PersistentTopic.java | 18 +++--
.../broker/admin/TopicPoliciesDisableTest.java | 20 +++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 38 +++++++++
.../org/apache/pulsar/client/admin/Topics.java | 94 ++++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 77 +++++++++++++++++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 51 +++++++++++-
.../pulsar/common/policies/data/TopicPolicies.java | 5 ++
9 files changed, 422 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0123aa7..a78ead7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3100,4 +3100,46 @@ public class PersistentTopicsBase extends AdminResource {
}
+ protected Optional<Long> internalGetCompactionThreshold() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
+ }
+
+ protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
+ if (compactionThreshold != null && compactionThreshold < 0) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
+ }
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+
+ TopicPolicies topicPolicies = getTopicPolicies(topicName)
+ .orElseGet(TopicPolicies::new);
+ topicPolicies.setCompactionThreshold(compactionThreshold);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+ }
+
+ protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ checkTopicLevelPolicyEnable();
+ Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+ if (!topicPolicies.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ topicPolicies.get().setCompactionThreshold(null);
+ return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c00f270..c98867a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1784,5 +1784,92 @@ public class PersistentTopics extends PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+ @ApiOperation(value = "Get compaction threshold configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ try {
+ Optional<Long> compactionThreshold = internalGetCompactionThreshold();
+ if (!compactionThreshold.isPresent()) {
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ asyncResponse.resume(compactionThreshold.get());
+ }
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+ @ApiOperation(value = "Set compaction threshold configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> {
+ if (ex instanceof RestException) {
+ log.error("Failed to set topic dispatch rate", ex);
+ asyncResponse.resume(ex);
+ } else if (ex != null) {
+ log.error("Failed to set topic dispatch rate");
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ try {
+ log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName(),
+ jsonMapper().writeValueAsString(compactionThreshold));
+ } catch (JsonProcessingException ignore) {}
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
+ @ApiOperation(value = "Remove compaction threshold configuration for specified topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove topic dispatch rate", ex);
+ asyncResponse.resume(new RestException(ex));
+ } else {
+ log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
+ clientAppId(),
+ tenant,
+ namespace,
+ topicName.getLocalName());
+ asyncResponse.resume(Response.noContent().build());
+ }
+ });
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3899313..7ea58e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1155,12 +1155,18 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
- Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
+ Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name))
+ .map(TopicPolicies::getCompactionThreshold)
+ .orElse(null);
+ if (compactionThreshold == null) {
+ Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, name.getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
+ compactionThreshold = policies.compaction_threshold;
+ }
- if (isSystemTopic() || policies.compaction_threshold != 0
+ if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {
long backlogEstimate = 0;
@@ -1173,13 +1179,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
backlogEstimate = ledger.getEstimatedBacklogSize();
}
- if (backlogEstimate > policies.compaction_threshold) {
+ if (backlogEstimate > compactionThreshold) {
try {
triggerCompaction();
} catch (AlreadyRunningException are) {
log.debug("[{}] Compaction already running, so don't trigger again, "
+ "even though backlog({}) is over threshold({})",
- name, backlogEstimate, policies.compaction_threshold);
+ name, backlogEstimate, compactionThreshold);
}
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index 329c3c7..fa073fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -150,4 +150,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getStatusCode(), 405);
}
}
+
+ @Test
+ public void testCompactionThresholdDisabled() {
+ Long compactionThreshold = 10000L;
+ log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+ try {
+ admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+
+ try {
+ admin.topics().getCompactionThreshold(testTopic);
+ Assert.fail();
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getStatusCode(), 405);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index ef16adb..e216661 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -377,4 +377,42 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(testTopic, true);
}
+
+ @Test
+ public void testGetSetCompactionThreshold() throws Exception {
+ long compactionThreshold = 100000;
+ log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+ admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+ log.info("Compaction threshold set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+ log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
+ Assert.assertEquals(getCompactionThreshold, compactionThreshold);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
+
+ @Test
+ public void testRemoveCompactionThreshold() throws Exception {
+ Long compactionThreshold = 100000L;
+ log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);
+
+ admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
+ log.info("Compaction threshold set success on topic: {}", testTopic);
+
+ Thread.sleep(3000);
+ Long getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+ log.info("Compaction threshold: {} get on topic: {}", getCompactionThreshold, testTopic);
+ Assert.assertEquals(getCompactionThreshold, compactionThreshold);
+
+ admin.topics().removeCompactionThreshold(testTopic);
+ Thread.sleep(3000);
+ log.info("Compaction threshold get on topic: {} after remove", getCompactionThreshold, testTopic);
+ getCompactionThreshold = admin.topics().getCompactionThreshold(testTopic);
+ Assert.assertNull(getCompactionThreshold);
+
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 93d3192..6ca2390 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1913,4 +1913,98 @@ public interface Topics {
*/
CompletableFuture<Void> removeDispatchRateAsync(String topic) throws PulsarAdminException;
+ /**
+ * Get the compactionThreshold for a topic. The maximum number of bytes
+ * can have before compaction is triggered. 0 disables.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Long getCompactionThreshold(String topic) throws PulsarAdminException;
+
+ /**
+ * Get the compactionThreshold for a topic asynchronously. The maximum number of bytes
+ * can have before compaction is triggered. 0 disables.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Long> getCompactionThresholdAsync(String topic);
+
+ /**
+ * Set the compactionThreshold for a topic. The maximum number of bytes
+ * can have before compaction is triggered. 0 disables.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ * @param compactionThreshold
+ * maximum number of backlog bytes before compaction is triggered
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException;
+
+ /**
+ * Set the compactionThreshold for a topic asynchronously. The maximum number of bytes
+ * can have before compaction is triggered. 0 disables.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>10000000</code>
+ * </pre>
+ *
+ * @param topic
+ * Topic name
+ * @param compactionThreshold
+ * maximum number of backlog bytes before compaction is triggered
+ */
+ CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold);
+
+ /**
+ * Remove the compactionThreshold for a topic.
+ * @param topic
+ * Topic name
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeCompactionThreshold(String topic) throws PulsarAdminException;
+
+ /**
+ * Remove the compactionThreshold for a topic asynchronously.
+ * @param topic
+ * Topic name
+ */
+ CompletableFuture<Void> removeCompactionThresholdAsync(String topic);
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ffd9339..ce4ce78 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2003,6 +2003,81 @@ public class TopicsImpl extends BaseResource implements Topics {
return asyncDeleteRequest(path);
}
+ @Override
+ public Long getCompactionThreshold(String topic) throws PulsarAdminException {
+ try {
+ return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "compactionThreshold");
+ final CompletableFuture<Long> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Long>() {
+ @Override
+ public void completed(Long compactionThreshold) {
+ future.complete(compactionThreshold);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException {
+ try {
+ setCompactionThresholdAsync(topic, compactionThreshold).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "compactionThreshold");
+ return asyncPostRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeCompactionThreshold(String topic) throws PulsarAdminException {
+ try {
+ removeCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeCompactionThresholdAsync(String topic) {
+ TopicName topicName = validateTopic(topic);
+ WebTarget path = topicPath(topicName, "compactionThreshold");
+ return asyncDeleteRequest(path);
+ }
- private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
+ private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b84794e..5b19576 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -123,6 +123,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+ jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
+ jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
+ jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
}
@Parameters(commandDescription = "Get the list of topics under a namespace.")
@@ -259,17 +262,17 @@ public class CmdTopics extends CmdBase {
@Parameters(commandDescription = "Create a non-partitioned topic.")
private class CreateNonPartitionedCmd extends CliCommand {
-
+
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List<String> params;
-
+
@Override
void run() throws Exception {
String topic = validateTopicName(params);
topics.createNonPartitionedTopic(topic);
}
}
-
+
@Parameters(commandDescription = "Update existing non-global partitioned topic. \n"
+ "\t\tNew updating number of partitions must be greater than existing number of partitions.")
private class UpdatePartitionedCmd extends CliCommand {
@@ -1162,4 +1165,46 @@ public class CmdTopics extends CmdBase {
admin.topics().removeDispatchRate(persistentTopic);
}
}
+
+ @Parameters(commandDescription = "Get compaction threshold for a topic")
+ private class GetCompactionThreshold extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getCompactionThreshold(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set compaction threshold for a topic")
+ private class SetCompactionThreshold extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--threshold", "-t" },
+ description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+ + "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
+ required = true)
+ private String threshold = "0";
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setCompactionThreshold(persistentTopic, validateSizeString(threshold));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove compaction threshold for a topic")
+ private class RemoveCompactionThreshold extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeCompactionThreshold(persistentTopic);
+ }
+ }
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 0e459c6..d56e283 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -50,6 +50,7 @@ public class TopicPolicies {
private Long delayedDeliveryTickTimeMillis = null;
private Boolean delayedDeliveryEnabled = null;
private DispatchRate dispatchRate = null;
+ private Long compactionThreshold = null;
public boolean isMaxUnackedMessagesOnConsumerSet() {
return maxUnackedMessagesOnConsumer != null;
@@ -102,4 +103,8 @@ public class TopicPolicies {
public boolean isDispatchRateSet() {
return dispatchRate != null;
}
+
+ public boolean isCompactionThresholdSet() {
+ return compactionThreshold != null;
+ }
}