You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "tjiuming (via GitHub)" <gi...@apache.org> on 2023/02/07 08:00:41 UTC

[GitHub] [pulsar] tjiuming opened a new issue, #19448: PIP-245: Subscriptions expiration for NonPersistentTopic only

tjiuming opened a new issue, #19448:
URL: https://github.com/apache/pulsar/issues/19448

   ### Motivation
   
   Currently, in Pulsar, we have a configuration named `subscriptionExpirationTimeMinutes` to control the subscription expiration of `PersistentTopic` and `NonPersistentTopic`.
   
   When we set a value which is greater than 0 to `subscriptionExpirationTimeMinutes`, it will affect both `PersistentTopic` and `NonPersistentTopic`. Their inactive subscriptions will get expired and will clean automatically.
   
   For `NonPersistentTopic`, its subscriptions can be clean because we don't guarantee its data integrity. But for `PersistentTopic`, if we clean its subscriptions automatically may lead to data loss.
   
   However, their subscription expiration is managed by the same configuration(`subscriptionExpirationTimeMinutes`), we can't control their subscription expiration independently.
   
   So I want to introduce a new configuration named `nonPersistentSubscriptionExpirationTimeMinutes` to manage `NonPersistentTopic`'s subscription expiration.
   
   ### Goal
   
   Provide a new configuration to manage the `NonPersistentTopic`'s subscription expiration independently.
   
   ### API Changes
   
   Add a new field which named `non_persistent_subscription_expiration_time_minutes` into [`Policies`](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java).
   It will change the Request/Response of the following APIs:
   1. [Namespaces#getPolicies(String namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L277)
   2. [Namespaces#getPoliciesAsync(String namespace)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L309)
   3. [Namespaces#createNamespace(String namespace, Policies policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L462)
   4. [Namespaces#createNamespaceAsync(String namespace, Policies policies)](https://github.com/apache/pulsar/blob/master/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java#L474)
   
   Add the following rest-APIs to manage the value of `non_persistent_subscription_expiration_time_minutes`.
   1. getNonPersistentSubscriptionExpirationTime
   ```
       @GET
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Get the non-persistent subscription expiration time for the namespace")
       @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
       public void getNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
                                                    @PathParam("tenant") String tenant,
                                                    @PathParam("namespace") String namespace) {
           // ignore...
       }
   ```
   2. setNonPersistentSubscriptionExpirationTime
   ```
       @POST
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Set non-persistent subscription expiration time in minutes for namespace")
       @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
               @ApiResponse(code = 412, message = "Invalid expiration time")})
       public void setNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
                                                 @PathParam("tenant") String tenant,
                                                 @PathParam("namespace") String namespace,
                                                 @ApiParam(value =
                                                         "Expiration time in minutes for the specified namespace",
                                                         required = true) int expirationTime) {
           // ignore...
       }
   ```
   3. removeNonPersistentSubscriptionExpirationTime
   ```
       @DELETE
       @Path("/{tenant}/{namespace}/nonPersistentSubscriptionExpirationTime")
       @ApiOperation(value = "Remove non-persistent subscription expiration time for namespace")
       @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
               @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
       public void removeNonPersistentSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse,
                                                    @PathParam("tenant") String tenant,
                                                    @PathParam("namespace") String namespace) {
           // ignore...
       }
   ```
   
   ### Implementation
   
   Modify [`NonPersistentTopic#checkInactiveSubscriptions`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L1022) as follows:
   ```
           TopicName name = TopicName.get(topic);
           try {
               Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
                       .getPolicies(name.getNamespaceObject())
                       .orElseThrow(MetadataStoreException.NotFoundException::new);
               final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
                       .getNonPersistentSubscriptionExpirationTimeMinutes();
               final Integer nsExpirationTime = policies.non_persistent_subscription_expiration_time_minutes;
               final long expirationTimeMillis = TimeUnit.MINUTES
                       .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
               if (expirationTimeMillis > 0) {
                   subscriptions.forEach((subName, sub) -> {
                       if (sub.getDispatcher() != null
                               && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) {
                           return;
                       }
                       if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) {
                           sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration "
                                   + "with last active [{}]", topic, subName, sub.getLastActive()));
                       }
                   });
               }
           } catch (Exception e) {
               if (log.isDebugEnabled()) {
                   log.debug("[{}] Error getting policies", topic);
               }
           }
   ```
   
   This is no longer compatible with the previous implementation.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on issue #19448: [DISCUSS]PIP-245: Make subscriptions of NonPersistentTopic non-durable

Posted by "tjiuming (via GitHub)" <gi...@apache.org>.
tjiuming commented on issue #19448:
URL: https://github.com/apache/pulsar/issues/19448#issuecomment-1429302110

   > The default subscription mode is durable. So users will see the warning log if they upgrade to the new version client?
   
   Yes
   
   >  Why not just change isDurable to false for the non-persistent topic subscriptions. And if users are explicitly changed to true, then see the warning log.
   
   Because `PersistentTopic` and `NonPersistentTopic` subscriptions are using `ConsumerConfigurationData`, 
   if we change `isDurable` to `false`, when create consumers on `PersistentTopic`, we also need to add logics to change `isDurable` to `true`.
   
   @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on issue #19448: [DISCUSS]PIP-245: Subscriptions expiration for NonPersistentTopic only

Posted by "tjiuming (via GitHub)" <gi...@apache.org>.
tjiuming commented on issue #19448:
URL: https://github.com/apache/pulsar/issues/19448#issuecomment-1420367193

   Discuss thread: https://lists.apache.org/thread/2ltmyglnb25jy8nk58twkwbglws43bst


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #19448: PIP-245: Make subscriptions of NonPersistentTopic non-durable

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #19448:
URL: https://github.com/apache/pulsar/issues/19448#issuecomment-1499837609

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on issue #19448: [DISCUSS]PIP-245: Make subscriptions of NonPersistentTopic non-durable

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on issue #19448:
URL: https://github.com/apache/pulsar/issues/19448#issuecomment-1427569613

   The default subscription mode is durable. So users will see the warning log if they upgrade to the new version client? Why not just change `isDurable` to false for the non-persistent topic subscriptions. And if users are explicitly changed to true, then see the warning log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dao-jun commented on issue #19448: PIP-245: Make subscriptions of NonPersistentTopic non-durable

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on issue #19448:
URL: https://github.com/apache/pulsar/issues/19448#issuecomment-1458386764

   Vote thread: https://lists.apache.org/thread/phrktmgln08pgpddvx8vvwzvgnq9kd0w


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org