You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/04 15:56:53 UTC

[GitHub] [pulsar] jianyun8023 opened a new pull request #7747: [Issue 2688]Support set retention on topic level.

jianyun8023 opened a new pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747


   ### Motivation
   Support set retention quota on topic level.
   Based on the system topic function.
   ###Modifications
   Support get-retention on topic level.
   Support set-retention on topic level.
   
   ### Verifying this change
   This change added tests and can be verified as follows:
   `TopicTopicPoliciesQuotaTest.java`,`TopicPoliciesDisableTest.java`
     - *test set topic retention*
     - *test get topic retention*
     - *test disabled topic retention*
     - *test topic retention parameters are valid*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (`yes`)
     - Anything that affects deployment: (no)
   ### Documentation
   
     - *Does this pull request introduce a new feature? (`yes`)*
     - *If yes, how is the feature documented? (`docs` / `JavaDocs`)*
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-669088710


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-669212366


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-669485614


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-669098931


   @MarvinCai Could you please also help review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] MarvinCai commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r466148847



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1032,6 +1033,72 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Get retention config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalGetRetention(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Set retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
+    public void setRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalSetRetention(asyncResponse, retention);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Remove retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })

Review comment:
       minor: should remove this.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1032,6 +1033,72 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Get retention config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalGetRetention(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Set retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
+    public void setRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {

Review comment:
       minor: for specified topic

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1433,4 +1434,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeBacklogQuota(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic.

Review comment:
       Should this be set retention for all messages on a topic?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -86,123 +71,172 @@ public void cleanup() throws Exception {
     public void testSetBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
 
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuotaInManager, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
     public void testRemoveBacklogQuota() throws Exception {
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(backlogQuota, getBacklogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuota, backlogQuotaInManager);
 
-        admin.topics().removeBacklogQuota(backlogQuotaTopic);
-        getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        admin.topics().removeBacklogQuota(testTopic);
+        getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertNull(getBacklogQuota);
 
-        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
         log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
-                backlogQuotaTopic);
+                testTopic);
         Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testCheckQuota() throws Exception {
+    public void testCheckBlcklogQuota() throws Exception {
         RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
-        String namespace = TopicName.get(backlogQuotaTopic).getNamespace();
+        String namespace = TopicName.get(testTopic).getNamespace();
         admin.namespaces().setRetention(namespace, retentionPolicies);
 
         BacklogQuota backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testBacklogQuotaDisabled() throws Exception {
-        disableTopicLevelPolicies();
-        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
-
-        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+    public void testCheckRetention() throws Exception {
+        BacklogQuota backlogQuota =

Review comment:
       Is this backlog quota added by mistake.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -880,4 +884,65 @@ void run() throws PulsarAdminException {
             admin.topics().removeBacklogQuota(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get the retention policy for a topic")
+    private class GetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getRetention(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the retention policy for a topic")
+    private class SetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--time",
+                "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "

Review comment:
       I think the example here might confuse people, e.g. 3h, 2d, might make people think they can provide retention with different time unit, but what we expect here is only in minutes. 
   Same for retention size that we only expect in MB.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-668959075


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r472889095



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       okay, let me solve these problems




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r467882743



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Checked on the upper level. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-668999718


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-671644539


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r466305574



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -880,4 +884,65 @@ void run() throws PulsarAdminException {
             admin.topics().removeBacklogQuota(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get the retention policy for a topic")
+    private class GetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getRetention(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the retention policy for a topic")
+    private class SetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--time",
+                "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "

Review comment:
       I refer to the namespace cmd  `CmdNamespaces.SetRetention`. If you want to revise, I suggest open an issue for unified modification




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r466302129



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1032,6 +1033,72 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Get retention config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalGetRetention(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Set retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
+    public void setRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r467881929



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected void internalSetRetention(AsyncResponse asyncResponse,
+            RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            asyncResponse.resume(Response.noContent().build());
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,

Review comment:
       fix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r469820223



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       checkTopicLevelPolicyEnable() is also called by getTopicPolicies(), so we can delete it here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic asynchronously.

Review comment:
       Need to modify this doc.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,67 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Retention Quota must exceed configured backlog quota for topic. " +
+                            "Please increase retention quota and retry");
+        }
+        topicPolicies.setRetentionPolicies(retention);
+        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemoveRetention() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Same to the above.

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1476,4 +1477,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeMessageTTL(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration on a topic.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic asynchronously.
+     * <p/>
+     * Set the retention configuration on a topic. This operation requires Pulsar super-user access.
+     * <p/>
+     * Request parameter example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies retention);
+
+    /**
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     * Response example:
+     * <p/>
+     *
+     * <pre>
+     * <code>
+     * {
+     *     "retentionTimeInMinutes" : 60,            // how long to retain messages
+     *     "retentionSizeInMB" : 1024,              // retention backlog limit
+     * }
+     * </code>
+     * </pre>
+     *
+     * @param topic
+     *            Topic name
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic does not exist
+     * @throws ConflictException
+     *             Concurrent modification
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    RetentionPolicies getRetention(String topic) throws PulsarAdminException;
+
+    /**
+     * Get the retention configuration for a topic asynchronously.
+     * <p/>
+     * Get the retention configuration for a topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     */
+    CompletableFuture<RetentionPolicies> getRetentionAsync(String topic);
+
+    /**
+     * Remove the retention configuration for all the topics on a topic.

Review comment:
       Need to modify this doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r466304256



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -86,123 +71,172 @@ public void cleanup() throws Exception {
     public void testSetBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
 
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuotaInManager, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
     public void testRemoveBacklogQuota() throws Exception {
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(backlogQuota, getBacklogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuota, backlogQuotaInManager);
 
-        admin.topics().removeBacklogQuota(backlogQuotaTopic);
-        getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        admin.topics().removeBacklogQuota(testTopic);
+        getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertNull(getBacklogQuota);
 
-        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
         log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
-                backlogQuotaTopic);
+                testTopic);
         Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testCheckQuota() throws Exception {
+    public void testCheckBlcklogQuota() throws Exception {
         RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
-        String namespace = TopicName.get(backlogQuotaTopic).getNamespace();
+        String namespace = TopicName.get(testTopic).getNamespace();
         admin.namespaces().setRetention(namespace, retentionPolicies);
 
         BacklogQuota backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testBacklogQuotaDisabled() throws Exception {
-        disableTopicLevelPolicies();
-        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
-
-        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+    public void testCheckRetention() throws Exception {
+        BacklogQuota backlogQuota =

Review comment:
       This is a problem, I have modified it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r467855263



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
+                .map(TopicPolicies::getRetentionPolicies);
+        if (!retention.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        }else {
+            asyncResponse.resume(retention.get());
+        }
+    }
+
+    protected void internalSetRetention(AsyncResponse asyncResponse,
+            RetentionPolicies retention) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (retention == null) {
+            asyncResponse.resume(Response.noContent().build());
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+                .orElseGet(TopicPolicies::new);
+        BacklogQuota backlogQuota =
+                    topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
+        if (backlogQuota == null){
+            Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
+            backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
+        }
+        if(!checkBacklogQuota(backlogQuota, retention)){
+            log.warn(
+                    "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
+                    clientAppId(), topicName);
+            throw new RestException(Status.PRECONDITION_FAILED,

Review comment:
       complete the asyncResponse here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2156,6 +2158,96 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
         internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
     }
 
+    protected void internalGetRetention(AsyncResponse asyncResponse){
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();

Review comment:
       Here also throw exceptions, you can pass the asyncResponse to this method and resume the asyncResponse in the `checkTopicLevelPolicyEnable `. Please check all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on pull request #7747: [Issue 2688]Support set retention on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#issuecomment-669871937


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org