You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/06 05:21:26 UTC

[GitHub] [pulsar] MarvinCai commented on a change in pull request #7747: [Issue 2688]Support set retention on topic level.

MarvinCai commented on a change in pull request #7747:
URL: https://github.com/apache/pulsar/pull/7747#discussion_r466148847



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1032,6 +1033,72 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Get retention config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalGetRetention(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Set retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
+    public void setRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalSetRetention(asyncResponse, retention);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Remove retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })

Review comment:
       minor: should remove this.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1032,6 +1033,72 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @Pa
         internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Get retention config on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void getRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            internalGetRetention(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/retention")
+    @ApiOperation(value = "Set retention configuration on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
+    public void setRetention(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {

Review comment:
       minor: for specified topic

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1433,4 +1434,142 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      *             Unexpected error
      */
     void removeBacklogQuota(String topic) throws PulsarAdminException;
+
+    /**
+     * Set the retention configuration for all the topics on a topic.

Review comment:
       Should this be set retention for all messages on a topic?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -86,123 +71,172 @@ public void cleanup() throws Exception {
     public void testSetBacklogQuota() throws Exception {
 
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
 
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuotaInManager, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
     public void testRemoveBacklogQuota() throws Exception {
         BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
-        log.info("Backlog quota set success on topic: {}", backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
+        log.info("Backlog quota set success on topic: {}", testTopic);
 
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {}", getBacklogQuota, testTopic);
         Assert.assertEquals(backlogQuota, getBacklogQuota);
 
         BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
-        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
-        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, backlogQuotaTopic);
+        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
+        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
         Assert.assertEquals(backlogQuota, backlogQuotaInManager);
 
-        admin.topics().removeBacklogQuota(backlogQuotaTopic);
-        getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        admin.topics().removeBacklogQuota(testTopic);
+        getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertNull(getBacklogQuota);
 
-        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(backlogQuotaTopic));
+        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
         log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
-                backlogQuotaTopic);
+                testTopic);
         Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testCheckQuota() throws Exception {
+    public void testCheckBlcklogQuota() throws Exception {
         RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
-        String namespace = TopicName.get(backlogQuotaTopic).getNamespace();
+        String namespace = TopicName.get(testTopic).getNamespace();
         admin.namespaces().setRetention(namespace, retentionPolicies);
 
         BacklogQuota backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
         try {
-            admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+            admin.topics().setBacklogQuota(testTopic, backlogQuota);
             Assert.fail();
         } catch (PulsarAdminException e) {
             Assert.assertEquals(e.getStatusCode(), 412);
         }
         Thread.sleep(3000);
         backlogQuota =
                 new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
-        admin.topics().setBacklogQuota(backlogQuotaTopic, backlogQuota);
+        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
+        admin.topics().setBacklogQuota(testTopic, backlogQuota);
         Thread.sleep(3000);
-        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(backlogQuotaTopic)
+        BacklogQuota getBacklogQuota = admin.topics().getBacklogQuotaMap(testTopic)
                 .get(BacklogQuota.BacklogQuotaType.destination_storage);
-        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, backlogQuotaTopic);
+        log.info("Backlog quota {} get on topic: {} after remove", getBacklogQuota, testTopic);
         Assert.assertEquals(getBacklogQuota, backlogQuota);
 
-        admin.topics().deletePartitionedTopic(backlogQuotaTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);
     }
 
     @Test
-    public void testBacklogQuotaDisabled() throws Exception {
-        disableTopicLevelPolicies();
-        admin.topics().createPartitionedTopic(backlogQuotaTopic, 2);
-
-        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
-        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, backlogQuotaTopic);
+    public void testCheckRetention() throws Exception {
+        BacklogQuota backlogQuota =

Review comment:
       Is this backlog quota added by mistake.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -880,4 +884,65 @@ void run() throws PulsarAdminException {
             admin.topics().removeBacklogQuota(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get the retention policy for a topic")
+    private class GetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getRetention(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the retention policy for a topic")
+    private class SetRetention extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--time",
+                "-t" }, description = "Retention time in minutes (or minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w). "

Review comment:
       I think the example here might confuse people, e.g. 3h, 2d, might make people think they can provide retention with different time unit, but what we expect here is only in minutes. 
   Same for retention size that we only expect in MB.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org