You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/13 09:40:54 UTC

[GitHub] [pulsar] zhanghaou commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

zhanghaou commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r469820223



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       checkTopicLevelPolicyEnable() is also called by getTopicPolicies(), so we can delete it here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic asynchronously.

Review comment:
       Need to modify this doc.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Retention Quota must exceed configured backlog quota for topic. " +
+                            "Please increase retention quota and retry");
+        }
+        topicPolicies.setRetentionPolicies(retention);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveRetention() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic asynchronously.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies retention);
+
+    /**
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Response example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    RetentionPolicies getRetention(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the retention configuration for a topic asynchronously.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<RetentionPolicies> getRetentionAsync(String topic);
+
+    /**
+     * Remove the retention configuration for all the topics on a topic.

Review comment:
       Need to modify this doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org