You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/28 06:26:22 UTC
[pulsar] branch master updated: Support configuring
DeleteInactiveTopic setting in namespace policy (#7598)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00e3089 Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
00e3089 is described below
commit 00e30895b22129d5189db1592851bdb62e8c498b
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jul 28 14:25:58 2020 +0800
Support configuring DeleteInactiveTopic setting in namespace policy (#7598)
### Motivation
Support configuring DeleteInactiveTopic setting in namespace policy
### Modifications
Only the two parameters `brokerDeleteInactiveTopicsMode` and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` support namespace policy. The parameters are changed to Map structure, the key is the namespace, and the value is the parameter value.
Such as: namespace1=delete_when_no_subscriptions, namespace2=delete_when_no_subscriptions.
In addition, there is a key name called `default`. If it is set, other namespaces that do not specify parameters will use this parameter.
Such as: default=delete_when_no_subscriptions
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 48 ++++--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 36 +++++
.../pulsar/broker/service/AbstractTopic.java | 28 +++-
.../pulsar/broker/service/BrokerService.java | 8 +-
.../org/apache/pulsar/broker/service/Topic.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 23 ++-
.../broker/service/persistent/PersistentTopic.java | 21 ++-
.../broker/service/persistent/SystemTopic.java | 2 +-
.../pulsar/broker/service/BrokerTestBase.java | 7 +-
.../broker/service/InactiveTopicDeleteTest.java | 165 +++++++++++++++++++++
.../service/PersistentTopicConcurrentTest.java | 4 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 48 ++++++
.../client/admin/internal/NamespacesImpl.java | 81 ++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 14 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 73 ++++++++-
...{PolicyName.java => InactiveTopicPolicies.java} | 34 ++---
.../pulsar/common/policies/data/Policies.java | 6 +-
.../pulsar/common/policies/data/PolicyName.java | 1 +
18 files changed, 543 insertions(+), 58 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ec2c1bd..c76eeda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -26,6 +26,8 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
+
+import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
@@ -90,6 +92,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
@@ -1832,39 +1835,64 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ protected InactiveTopicPolicies internalGetInactiveTopic() {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ if (policies.inactive_topic_policies == null) {
+ return new InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
+ , config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
+ , config().isBrokerDeleteInactiveTopicsEnabled());
+ } else {
+ return policies.inactive_topic_policies;
+ }
+ }
+
+ protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
+ internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
+ }
+ protected void internalSetPolicies(String fieldName, Object value){
try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
- policies.delayed_delivery_policies = delayedDeliveryPolicies;
+ Field field = Policies.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(policies, value);
+
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
- log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(),
- namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
+ log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
+ namespaceName, jsonMapper().writeValueAsString(value));
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(),
- namespaceName);
+ log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist", clientAppId(),
+ fieldName, namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
- log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification",
- clientAppId(), namespaceName);
+ log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
+ clientAppId(), fieldName, namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
- log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName,
- e);
+ log.error("[{}] Failed to update {} configuration for namespace {}", clientAppId(), fieldName
+ , namespaceName, e);
throw new RestException(e);
}
}
+ protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
+ validateSuperUserAccess();
+ validatePoliciesReadOnlyAccess();
+ internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies);
+ }
+
protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0a9ec96..f162207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
@@ -858,6 +859,41 @@ public class Namespaces extends NamespacesBase {
}
@GET
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Get inactive topic policies config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"), })
+ public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ return internalGetInactiveTopic();
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Remove inactive topic policies from a namespace.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void removeInactiveTopicPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetInactiveTopic( null);
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/inactiveTopicPolicies")
+ @ApiOperation(value = "Set inactive topic policies config on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
+ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Inactive topic policies for the specified namespace") InactiveTopicPolicies inactiveTopicPolicies) {
+ validateNamespaceName(tenant, namespace);
+ internalSetInactiveTopic(inactiveTopicPolicies);
+ }
+
+ @GET
@Path("/{tenant}/{namespace}/maxProducersPerTopic")
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4a02f1f..09a0521 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -63,8 +65,8 @@ public abstract class AbstractTopic implements Topic {
protected volatile boolean isFenced;
- // When set to false, this inactive topic can not be deleted
- protected boolean deleteWhileInactive;
+ // Inactive topic policies
+ protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
// Timestamp of when this topic was last seen active
protected volatile long lastActive;
@@ -98,8 +100,9 @@ public abstract class AbstractTopic implements Topic {
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
- this.deleteWhileInactive =
- brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+ this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
+ this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
+ this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
@@ -132,12 +135,14 @@ public abstract class AbstractTopic implements Topic {
return false;
}
+ @Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}
+ @Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
@@ -466,12 +471,23 @@ public abstract class AbstractTopic implements Topic {
}
public boolean isDeleteWhileInactive() {
- return deleteWhileInactive;
+ return this.inactiveTopicPolicies.isDeleteWhileInactive();
}
public void setDeleteWhileInactive(boolean deleteWhileInactive) {
- this.deleteWhileInactive = deleteWhileInactive;
+ this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
+
+ public InactiveTopicPolicies getInactiveTopicPolicies() {
+ return inactiveTopicPolicies;
+ }
+
+ public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
+ , int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
+ inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
+ inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
+ inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 34cca90..e6b1c81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -442,8 +442,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
- int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
- inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
+ inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), interval, interval,
TimeUnit.SECONDS);
}
@@ -1244,9 +1243,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return lookupRequestSemaphore.get();
}
- public void checkGC(int maxInactiveDurationInSec) {
- forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
- pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
+ public void checkGC() {
+ forEachTopic(Topic::checkGC);
}
public void checkMessageExpiry() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d20e700..4d14326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -124,7 +124,7 @@ public interface Topic {
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);
- void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);
+ void checkGC();
void checkInactiveSubscriptions();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index eb8d9d3..5a4d7cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
@@ -68,7 +69,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -145,6 +145,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
+ if (policies.inactive_topic_policies != null) {
+ inactiveTopicPolicies = policies.inactive_topic_policies;
+ }
setSchemaCompatibilityStrategy(policies);
schemaValidationEnforced = policies.schema_validation_enforced;
@@ -420,7 +423,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
/**
* Close this topic - close all producers and subscriptions associated with this topic
- *
+ *
* @param closeWithoutWaitingClientDisconnect
* don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
@@ -626,6 +629,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
return replicators.get(remoteCluster);
}
+ @Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
boolean hydratePublishers) {
@@ -755,6 +759,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
topicStatsStream.endObject();
}
+ @Override
public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
NonPersistentTopicStats stats = new NonPersistentTopicStats();
@@ -808,6 +813,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
return stats;
}
+ @Override
public PersistentTopicInternalStats getInternalStats() {
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
@@ -829,11 +835,12 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
- if (!deleteWhileInactive) {
+ public void checkGC() {
+ if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
+ int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive()) {
lastActive = System.nanoTime();
} else {
@@ -895,6 +902,14 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+
+ if (data.inactive_topic_policies != null) {
+ this.inactiveTopicPolicies = data.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+ resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
return checkReplicationAndRetryOnFailure();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8983238..377dc98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
@@ -255,6 +256,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
+ if (policies.inactive_topic_policies != null) {
+ inactiveTopicPolicies = policies.inactive_topic_policies;
+ }
maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(policies);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(policies);
@@ -1273,6 +1277,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return ledger;
}
+ @Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
@@ -1488,6 +1493,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return lastUpdatedAvgPublishRateInByte;
}
+ @Override
public TopicStats getStats(boolean getPreciseBacklog) {
TopicStats stats = new TopicStats();
@@ -1546,6 +1552,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return stats;
}
+ @Override
public PersistentTopicInternalStats getInternalStats() {
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
@@ -1628,11 +1635,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
- if (!deleteWhileInactive) {
+ public void checkGC() {
+ if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
+ InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.getInactiveTopicDeleteMode();
+ int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
@@ -1787,6 +1796,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
+ if (data.inactive_topic_policies != null) {
+ this.inactiveTopicPolicies = data.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
+ resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ , cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
@@ -1977,6 +1993,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}
+ @Override
public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 6720209..4b338a9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -46,7 +46,7 @@ public class SystemTopic extends PersistentTopic {
}
@Override
- public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
+ public void checkGC() {
// do nothing for system topic
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
index 314ccfa..ada2242 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java
@@ -55,7 +55,12 @@ public abstract class BrokerTestBase extends MockedPulsarServiceBaseTest {
void runGC() {
try {
- pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC(0)).get();
+ pulsar.getBrokerService().forEachTopic(topic -> {
+ if (topic instanceof AbstractTopic) {
+ ((AbstractTopic) topic).getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+ }
+ });
+ pulsar.getExecutor().submit(() -> pulsar.getBrokerService().checkGC()).get();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
} catch (Exception e) {
LOG.error("GC executor error", e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index c3d353b..143a4ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -19,10 +19,18 @@
package org.apache.pulsar.broker.service;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -68,6 +76,163 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
super.internalCleanup();
}
+ @Test(timeOut = 20000)
+ public void testTopicPolicyUpdateAndClean() throws Exception {
+ final String namespace = "prop/ns-abc";
+ final String namespace2 = "prop/ns-abc2";
+ final String namespace3 = "prop/ns-abc3";
+ List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+ super.resetConfig();
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
+ conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ InactiveTopicPolicies defaultPolicy = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions
+ , 1000, true);
+
+ super.baseSetup();
+
+ for (String ns : namespaceList) {
+ admin.namespaces().createNamespace(ns);
+ admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+ }
+
+ final String topic = "persistent://prop/ns-abc/testDeletePolicyUpdate";
+ final String topic2 = "persistent://prop/ns-abc2/testDeletePolicyUpdate";
+ final String topic3 = "persistent://prop/ns-abc3/testDeletePolicyUpdate";
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+
+ for (String tp : topics) {
+ admin.topics().createNonPartitionedTopic(tp);
+ }
+
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+ admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+ inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+ inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ admin.namespaces().setInactiveTopicPolicies(namespace3, inactiveTopicPolicies);
+
+ InactiveTopicPolicies policies;
+ //wait for zk
+ while (true) {
+ policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ if (policies.isDeleteWhileInactive()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));
+
+ admin.namespaces().removeInactiveTopicPolicies(namespace);
+ while (true) {
+ Thread.sleep(500);
+ policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ if (policies.getMaxInactiveDurationSeconds() == 1000) {
+ break;
+ }
+ }
+ Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
+ , defaultPolicy);
+
+ policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
+ Assert.assertTrue(policies.isDeleteWhileInactive());
+ Assert.assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ Assert.assertEquals(policies.getMaxInactiveDurationSeconds(), 1);
+ Assert.assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));
+
+ admin.namespaces().removeInactiveTopicPolicies(namespace2);
+ while (true) {
+ Thread.sleep(500);
+ policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+ if (policies.getMaxInactiveDurationSeconds() == 1000) {
+ break;
+ }
+ }
+ Assert.assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
+ , defaultPolicy);
+
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 20000)
+ public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
+ final String namespace = "prop/ns-abc";
+ final String namespace2 = "prop/ns-abc2";
+ final String namespace3 = "prop/ns-abc3";
+ List<String> namespaceList = Arrays.asList(namespace2, namespace3);
+
+ conf.setBrokerDeleteInactiveTopicsEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
+ super.baseSetup();
+
+ for (String ns : namespaceList) {
+ admin.namespaces().createNamespace(ns);
+ admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
+ }
+
+ final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ final String topic2 = "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ final String topic3 = "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig";
+ List<String> topics = Arrays.asList(topic, topic2, topic3);
+ //create producer/consumer and close
+ Map<String, String> topicToSub = new HashMap<>();
+ for (String tp : topics) {
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(tp).create();
+ String subName = "sub" + System.currentTimeMillis();
+ topicToSub.put(tp, subName);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(tp).subscriptionName(subName).subscribe();
+ for (int i = 0; i < 10; i++) {
+ producer.send("Pulsar".getBytes());
+ }
+ consumer.close();
+ producer.close();
+ Thread.sleep(1);
+ }
+ // namespace use delete_when_no_subscriptions, namespace2 use delete_when_subscriptions_caught_up
+ // namespace3 use default:delete_when_no_subscriptions
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,1,true);
+ admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
+ inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
+ admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
+
+ //wait for zk
+ while (true) {
+ InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ if (policies.isDeleteWhileInactive()) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ // topic should still exist
+ Thread.sleep(2000);
+ Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
+ Assert.assertTrue(admin.topics().getList(namespace2).contains(topic2));
+ Assert.assertTrue(admin.topics().getList(namespace3).contains(topic3));
+
+ // no backlog, trigger delete_when_subscriptions_caught_up
+ admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
+ Thread.sleep(2000);
+ Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+ // delete subscription, trigger delete_when_no_subscriptions
+ for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
+ admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
+ }
+ Thread.sleep(2000);
+ Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+ Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
+
+ super.internalCleanup();
+ }
+
@Test
public void testDeleteWhenNoBacklogs() throws Exception {
conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index cfc3937..12773b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -196,7 +196,9 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
// Thread.sleep(5,0);
log.info("{} forcing topic GC ", Thread.currentThread());
for (int i = 0; i < 2000; i++) {
- topic.checkGC(0, InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ topic.getInactiveTopicPolicies().setMaxInactiveDurationSeconds(0);
+ topic.getInactiveTopicPolicies().setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
+ topic.checkGC();
}
log.info("GC done..");
} catch (Exception e) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 7cb7349..f4315e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -2207,6 +2208,53 @@ public interface Namespaces {
String namespace, DelayedDeliveryPolicies delayedDeliveryPolicies);
/**
+ * Get the inactive deletion strategy for all topics within a namespace synchronously.
+ * @param namespace
+ * @return
+ * @throws PulsarAdminException
+ */
+ InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+ /**
+ * remove InactiveTopicPolicies from a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace);
+
+ /**
+ * Remove inactive topic policies from a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the inactive deletion strategy for all topics within a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace);
+
+ /**
+ * As same as setInactiveTopicPoliciesAsync,but it is synchronous.
+ * @param namespace
+ * @param inactiveTopicPolicies
+ */
+ void setInactiveTopicPolicies(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException;
+
+ /**
+ * You can set the inactive deletion strategy at the namespace level.
+ * Its priority is higher than the inactive deletion strategy at the broker level.
+ * All topics under this namespace will follow this strategy.
+ * @param namespace
+ * @param inactiveTopicPolicies
+ * @return
+ */
+ CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies);
+ /**
* Set the given subscription auth mode on all topics on a namespace.
*
* @param namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 1a9f010..83c6ce4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -956,6 +957,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void removeInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+ try {
+ removeInactiveTopicPoliciesAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public CompletableFuture<Void> removeBacklogQuotaAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "backlogQuota")
@@ -1775,6 +1798,64 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public InactiveTopicPolicies getInactiveTopicPolicies(String namespace) throws PulsarAdminException {
+ try {
+ return getInactiveTopicPoliciesAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
+ asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
+ @Override
+ public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
+ future.complete(inactiveTopicPolicies);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setInactiveTopicPolicies(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException {
+ try {
+ setInactiveTopicPoliciesAsync(namespace, inactiveTopicPolicies)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setInactiveTopicPoliciesAsync(
+ String namespace, InactiveTopicPolicies inactiveTopicPolicies) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "inactiveTopicPolicies");
+ return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
try {
return getMaxProducersPerTopicAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d5ca27e..18f0d39 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -76,6 +76,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@@ -406,6 +408,16 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-delayed-delivery myprop/clust/ns1"));
verify(mockNamespaces).getDelayedDelivery("myprop/clust/ns1");
+ namespaces.run(split("set-inactive-topic-policies myprop/clust/ns1 -e -t 1s -m delete_when_no_subscriptions"));
+ verify(mockNamespaces).setInactiveTopicPolicies("myprop/clust/ns1"
+ , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,true));
+
+ namespaces.run(split("get-inactive-topic-policies myprop/clust/ns1"));
+ verify(mockNamespaces).getInactiveTopicPolicies("myprop/clust/ns1");
+
+ namespaces.run(split("remove-inactive-topic-policies myprop/clust/ns1"));
+ verify(mockNamespaces).removeInactiveTopicPolicies("myprop/clust/ns1");
+
namespaces.run(split("clear-backlog myprop/clust/ns1 -force"));
verify(mockNamespaces).clearNamespaceBacklog("myprop/clust/ns1");
@@ -484,7 +496,7 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getDispatchRate("myprop/clust/ns1");
-
+
namespaces.run(split("set-publish-rate myprop/clust/ns1 -m 10 -b 20"));
verify(mockNamespaces).setPublishRate("myprop/clust/ns1", new PublishRate(10, 20));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 78d034f..661a56a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -658,7 +660,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(names = { "--relative-to-publish-rate",
"-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))\n", required = false)
private boolean relativeToPublishRate = false;
-
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
@@ -1014,6 +1016,67 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
+ private class GetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getInactiveTopicPolicies(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove inactive topic policies from a namespace")
+ private class RemoveInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeInactiveTopicPolicies(namespace);
+ }
+ }
+
+ @Parameters(commandDescription = "Set the inactive topic policies on a namespace")
+ private class SetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
+ private boolean enableDeleteWhileInactive = false;
+
+ @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
+ private boolean disableDeleteWhileInactive = false;
+
+ @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+ ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+ private String deleteInactiveTopicsMaxInactiveDuration;
+
+ @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
+ ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
+ private String inactiveTopicDeleteMode;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+
+ if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+ throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
+ }
+ InactiveTopicDeleteMode deleteMode = null;
+ try {
+ deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+ }
+ admin.namespaces().setInactiveTopicPolicies(namespace, new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+ }
+ }
+
@Parameters(commandDescription = "Set the delayed delivery policy on a namespace")
private class SetDelayedDelivery extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1662,7 +1725,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
-
+
jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
jcommander.addCommand("delete-bookie-affinity-group", new DeleteBookieAffinityGroup());
@@ -1679,7 +1742,7 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
-
+
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
@@ -1696,6 +1759,10 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+ jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
+ jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
+ jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+
jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
similarity index 68%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
index 439ed7b..ac4607b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
@@ -16,30 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.common.policies.data;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
/**
- * PolicyName authorization operations.
+ * Definition of the inactive topic policy.
*/
-public enum PolicyName {
- ALL,
- ANTI_AFFINITY,
- BACKLOG,
- COMPACTION,
- DELAYED_DELIVERY,
- DEDUPLICATION,
- MAX_CONSUMERS,
- MAX_PRODUCERS,
- MAX_UNACKED,
- OFFLOAD,
- PERSISTENCE,
- RATE,
- RETENTION,
- REPLICATION,
- REPLICATION_RATE,
- SCHEMA_COMPATIBILITY_STRATEGY,
- SUBSCRIPTION_AUTH_MODE,
- ENCRYPTION,
- TTL,
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InactiveTopicPolicies {
+ private InactiveTopicDeleteMode inactiveTopicDeleteMode;
+ private int maxInactiveDurationSeconds;
+ private boolean deleteWhileInactive;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 2946157..0fe0811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -74,6 +74,8 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public DelayedDeliveryPolicies delayed_delivery_policies = null;
@SuppressWarnings("checkstyle:MemberName")
+ public InactiveTopicPolicies inactive_topic_policies = null;
+ @SuppressWarnings("checkstyle:MemberName")
public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
@SuppressWarnings("checkstyle:MemberName")
@@ -120,7 +122,7 @@ public class Policies {
autoSubscriptionCreationOverride, persistence,
bundles, latency_stats_sample_rate,
message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies,
- encryption_required, delayed_delivery_policies,
+ encryption_required, delayed_delivery_policies, inactive_topic_policies,
subscription_auth_mode,
antiAffinityGroup, max_producers_per_topic,
max_consumers_per_topic, max_consumers_per_subscription,
@@ -158,6 +160,7 @@ public class Policies {
&& Objects.equals(retention_policies, other.retention_policies)
&& Objects.equals(encryption_required, other.encryption_required)
&& Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
+ && Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
&& Objects.equals(antiAffinityGroup, other.antiAffinityGroup)
&& max_producers_per_topic == other.max_producers_per_topic
@@ -218,6 +221,7 @@ public class Policies {
.add("deleted", deleted)
.add("encryption_required", encryption_required)
.add("delayed_delivery_policies", delayed_delivery_policies)
+ .add("inactive_topic_policies", inactive_topic_policies)
.add("subscription_auth_mode", subscription_auth_mode)
.add("max_producers_per_topic", max_producers_per_topic)
.add("max_consumers_per_topic", max_consumers_per_topic)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 439ed7b..8439a1f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -28,6 +28,7 @@ public enum PolicyName {
BACKLOG,
COMPACTION,
DELAYED_DELIVERY,
+ INACTIVE_TOPIC,
DEDUPLICATION,
MAX_CONSUMERS,
MAX_PRODUCERS,