You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/08 18:30:58 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #7784: support topic level delayed delivery policy

315157973 opened a new pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784


   
   
   Master Issue: #2688 
   
   ### Motivation
   support topic level delayed delivery policy
   
   ### Modifications
   Support set/get/remove delayed delivery policy on topic level.
   
   ### Verifying this change
   
   Added Unit test to verify set/get/remove delayed delivery policy at Topic level work as expected when Topic level policy is enabled/disabled
   
   - org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableAndDisableTopicDelayedDelivery
   - org.apache.pulsar.broker.admin.AdminApiDelayedDelivery#testEnableTopicDelayedDelivery
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7784: support topic level delayed delivery policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784#issuecomment-671307734


   @jianyun8023 Could you please also help review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #7784: support topic level delayed delivery policy

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7784: support topic level delayed delivery policy

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784#discussion_r467878432



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +578,30 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }

Review comment:
       return

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new TypeReference<List<String>>() {});
+            Map policiesMap = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policy properties");
+        } catch (IOException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "serialize the result failed");
+        }
+    }
+
+    protected void internalSetTopicPolicies(AsyncResponse asyncResponse, Map<String, String> policies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        try {
+            Map oldPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            oldPolicies.putAll(policies);
+            topicPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(oldPolicies), TopicPolicies.class);

Review comment:
       Please add the generic declaration of Map.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new TypeReference<List<String>>() {});
+            Map policiesMap = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), TopicPolicies.class);

Review comment:
       Please add the generic declaration of Map.
   ex `Map<String,Object>`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -246,6 +248,59 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+    @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
+                                                              @PathParam("namespace") String namespace,
+                                                              @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+            return new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+                    , topicPolicies.getDelayedDeliveryEnabled());
+        }
+        return new DelayedDeliveryPolicies(config().getDelayedDeliveryTickTimeMillis(), config().isDelayedDeliveryEnabled());
+    }

Review comment:
       When `DelayedDeliveryPolicies` is not set, the admin module should not return to the default value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7784: support topic level delayed delivery policy

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784#discussion_r467850364



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new TypeReference<List<String>>() {});
+            Map policiesMap = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policy properties");
+        } catch (IOException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "serialize the result failed");
+        }
+    }
+
+    protected void internalSetTopicPolicies(AsyncResponse asyncResponse, Map<String, String> policies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        try {
+            Map oldPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            oldPolicies.putAll(policies);
+            topicPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(oldPolicies), TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            log.error("incorrect policies map", e);
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policies map");

Review comment:
       Please complete the `asyncResponse` with exception

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -246,6 +246,41 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/{policyProperties}/getTopicPolicies")
+    @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public TopicPolicies getTopicPolicies(

Review comment:
       It's better to keep consistent with the delayed delivery setting of the namespace. 
   
   ```
       @GET
       @Path("/{tenant}/{namespace}/delayedDelivery")
       @ApiOperation(value = "Get delayed delivery messages config on a namespace.")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
               @ApiResponse(code = 409, message = "Concurrent modification"), })
       public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
                                            @PathParam("namespace") String namespace) {
           validateNamespaceName(tenant, namespace);
           return internalGetDelayedDelivery();
       }
   
       @POST
       @Path("/{tenant}/{namespace}/delayedDelivery")
       @ApiOperation(value = "Set delayed delivery messages config on a namespace.")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
       public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
               @PathParam("namespace") String namespace,
               @ApiParam(value = "Delayed delivery policies for the specified namespace") DelayedDeliveryPolicies deliveryPolicies) {
           validateNamespaceName(tenant, namespace);
           internalSetDelayedDelivery(deliveryPolicies);
       }
   ```
   
   And we should also add deleteDelayedDeliveryPolicies method for clearing the topic level delayed delivery policy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org