You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/10 12:05:25 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

codelipenghui commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r467855263



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected void internalSetRetention(AsyncResponse asyncResponse,
+            RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            asyncResponse.resume(Response.noContent().build());
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,

Review comment:
       complete the asyncResponse here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Here also throw exceptions, you can pass the asyncResponse to this method and resume the asyncResponse in the `checkTopicLevelPolicyEnable `. Please check all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org