You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/21 12:49:19 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7598: Support configuring DeleteInactiveTopic setting in namespace policy

codelipenghui commented on a change in pull request #7598:
URL: https://github.com/apache/pulsar/pull/7598#discussion_r458050955



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -98,8 +99,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {
         this.producers = new ConcurrentHashMap<>();
         this.isFenced = false;
         this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
-        this.deleteWhileInactive =
-                brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+        this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());

Review comment:
       If the namespace level policy already specified and then restart the broker, the inactive topic policy should apply the namespace level policy.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -859,6 +860,30 @@ public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
         internalSetDelayedDelivery(deliveryPolicies);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/inactiveTopic")
+    @ApiOperation(value = "Get inactive topic messages config on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"), })
+    public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
+                                                              @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        return internalGetInactiveTopic();
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/inactiveTopic")

Review comment:
       ```suggestion
       @Path("/{tenant}/{namespace}/inactiveTopicPolicy")
   ```

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -1696,6 +1746,9 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
         jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
 
+        jcommander.addCommand("get-inactive-topic", new GetInactiveTopicPolicies());

Review comment:
       ```suggestion
           jcommander.addCommand("get-inactive-topic-policy", new GetInactiveTopicPolicies());
   ```

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -1696,6 +1746,9 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
         jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
 
+        jcommander.addCommand("get-inactive-topic", new GetInactiveTopicPolicies());
+        jcommander.addCommand("set-inactive-topic", new SetInactiveTopicPolicies());

Review comment:
       ```suggestion
           jcommander.addCommand("set-inactive-topic-policy", new SetInactiveTopicPolicies());
   ```

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -1014,6 +1016,54 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getInactiveTopicPolicies(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a namespace")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable inactive topic messages")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable inactive topic messages")
+        private boolean disable = false;
+
+        @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+                ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String deleteInactiveTopicsMaxInactiveDuration;
+
+        @Parameter(names = { "--delete-mode", "-m" }, description = "Disable inactive topic messages", required = true)
+        private String inactiveTopicDeleteMode;

Review comment:
       It's better to provide the options of `inactiveTopicDeleteMode`. So that users can know what to fill in.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -1014,6 +1016,54 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Get the inactive topic policy for a namespace")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getInactiveTopicPolicies(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a namespace")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable inactive topic messages")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable inactive topic messages")
+        private boolean disable = false;

Review comment:
       use `deleteWhileInactive` here is more reasonable because of the command name is `set-inactive-topic`. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -859,6 +860,30 @@ public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
         internalSetDelayedDelivery(deliveryPolicies);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/inactiveTopic")

Review comment:
       ```suggestion
       @Path("/{tenant}/{namespace}/inactiveTopicPolicy")
   ```

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicPolicies.java
##########
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Definition of the inactive topic policy.
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InactiveTopicPolicies {
+    private InactiveTopicDeleteMode inactiveTopicDeleteMode;
+    private int brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;

Review comment:
       ```suggestion
       private int maxInactiveDurationSeconds;
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -829,11 +829,14 @@ public boolean isActive() {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC(int maxInactiveDurationSeconds, InactiveTopicDeleteMode inactiveTopicDeleteMode) {
+        if (!isDeleteWhileInactive()) {

Review comment:
       If we already use InactiveTopicPolicies, I think we don't need to pass `maxInactiveDurationSeconds` and `inactiveTopicDeleteMode` in the param list. Of course, if only use InactiveTopicPolicies to maintain the namespace level policy, we still need to pass parameters.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1628,11 +1628,17 @@ private boolean hasBacklogs() {
     }
 
     @Override
-    public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
-        if (!deleteWhileInactive) {
+    public void checkGC(int maxInactiveDurationSeconds, InactiveTopicDeleteMode inactiveTopicDeleteMode) {
+        if (!isDeleteWhileInactive()) {

Review comment:
       Same as the comment that I left in the NonPersistentTopic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org