You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/10 12:48:40 UTC

[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7784: support topic level delayed delivery policy

jianyun8023 commented on a change in pull request #7784:
URL: https://github.com/apache/pulsar/pull/7784#discussion_r467878432



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +578,30 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, DelayedDeliveryPolicies deliveryPolicies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }

Review comment:
       return

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new TypeReference<List<String>>() {});
+            Map policiesMap = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), TopicPolicies.class);
+        } catch (JsonProcessingException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "incorrect policy properties");
+        } catch (IOException e) {
+            throw new RestException(Status.NOT_ACCEPTABLE, "serialize the result failed");
+        }
+    }
+
+    protected void internalSetTopicPolicies(AsyncResponse asyncResponse, Map<String, String> policies) {
+        TopicPolicies topicPolicies = null;
+        try {
+            topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.error("Topic {} policies cache have not init.", topicName);
+            asyncResponse.resume(new RestException(e));
+        }
+        if (topicPolicies == null) {
+            topicPolicies = new TopicPolicies();
+        }
+        try {
+            Map oldPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            oldPolicies.putAll(policies);
+            topicPolicies = jsonMapper().readValue(jsonMapper().writeValueAsString(oldPolicies), TopicPolicies.class);

Review comment:
       Please add the generic declaration of Map.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -577,6 +580,56 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
         });
     }
 
+    protected TopicPolicies internalGetTopicPolicies(String properties) {
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        Map result = new HashMap();
+        try {
+            List<String> propertyList = jsonMapper().readValue(properties, new TypeReference<List<String>>() {});
+            Map policiesMap = jsonMapper().readValue(jsonMapper().writeValueAsString(topicPolicies), Map.class);
+            propertyList.forEach(property -> {
+                if (policiesMap.get(property) != null) {
+                    result.put(property, policiesMap.get(property));
+                }
+            });
+            return jsonMapper().readValue(jsonMapper().writeValueAsBytes(result), TopicPolicies.class);

Review comment:
       Please add the generic declaration of Map.
   ex `Map<String,Object>`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -246,6 +248,59 @@ public void createNonPartitionedTopic(
         internalCreateNonPartitionedTopic(authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
+    @ApiOperation(value = "Get delayed delivery messages config on a topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error"),})
+    public DelayedDeliveryPolicies getDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
+                                                              @PathParam("namespace") String namespace,
+                                                              @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
+        if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
+            return new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
+                    , topicPolicies.getDelayedDeliveryEnabled());
+        }
+        return new DelayedDeliveryPolicies(config().getDelayedDeliveryTickTimeMillis(), config().isDelayedDeliveryEnabled());
+    }

Review comment:
       When `DelayedDeliveryPolicies` is not set, the admin module should not return to the default value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org