You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/07/13 08:53:19 UTC

[GitHub] [rocketmq] vergilyn opened a new issue, #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

vergilyn opened a new issue, #4602:
URL: https://github.com/apache/rocketmq/issues/4602

   ### FEATURE REQUEST
   
   #### 1. Please describe the feature you are requesting.
   提供一种方式,获取所有在 `ConsumeMessageOrderlyService / ConsumeMessageConcurrentlyService` 中创建的 `consumeExecutor`(ThreadPoolExecutor)。
   
   #### 2. Provide any additional detail on your proposed use case for this feature.
   期望:在应用关闭时,能判断是否还有消费任务正在执行(或者等待执行)。
   
   有时在业务需求上,希望强制等到所有消费任务都正常执行完,再关闭应用。
   
   **特别:** 消费逻辑中可能还需要依赖其它资源,参考:
   - issue#2085: [Support graceful shutdown for push consumer](https://github.com/apache/rocketmq/issues/2085)
   - pull#2084: [[ISSUE#2085]support graceful shutdown push consumer](https://github.com/apache/rocketmq/pull/2084)
   
   但是,如果通过简单设置`awaitTerminationMillisWhenShutdown`不能达到所有消费任务正常执行完的需求。
   
   #### 3. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
   
   **nice-to-have.**
   
   考虑业务系统中监听 spring `ConextClosedEvent`,但是很难拿到所有的 consumerExecutor's(目前想到的只有反射的方式),
   导致无法通过 `ThreadPoolExecutor#isTerminated()` 判断是否提交的任务都执行完成。
   
   #### 4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] vergilyn commented on issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

Posted by GitBox <gi...@apache.org>.
vergilyn commented on issue #4602:
URL: https://github.com/apache/rocketmq/issues/4602#issuecomment-1183942894

   
   @lizhiboo sorry, I understood the confusion before.
   
   I want to have been `awaitterminationmswhenshutdown` implementation, as long as `awaitterminationmswhenshutdown` set is long enough. 
   
   ```JAVA
       public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
           // Disable new tasks from being submitted.
           executor.shutdown();
           try {
               // Wait a while for existing tasks to terminate.
               if (!executor.awaitTermination(timeout, timeUnit)) {
                   executor.shutdownNow();
                   // Wait a while for tasks to respond to being cancelled.
                   if (!executor.awaitTermination(timeout, timeUnit)) {
                       log.warn(String.format("%s didn't terminate!", executor));
                   }
               }
           } catch (InterruptedException ie) {
               // (Re-)Cancel if current thread also interrupted.
               executor.shutdownNow();
               // Preserve interrupt status.
               Thread.currentThread().interrupt();
           }
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] djl394922860 commented on issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

Posted by GitBox <gi...@apache.org>.
djl394922860 commented on issue #4602:
URL: https://github.com/apache/rocketmq/issues/4602#issuecomment-1183936790

   > @vergilyn IMO, depending on consumeExecutor's status is also not correct, consumeExecutor is still running, its status is always changing. if set awaitTerminationMillsWhenShutdown 0, ConsumeExecutor.awaitTermination method blocks until all tasks have completed execution after a shutdown request, or the current thread is interrupted. If user do not call thread.interrupt, it will keep waiting until all tasks have completed.
   
   ```
   /**
   * Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
   */
   private long awaitTerminationMillisWhenShutdown = 0;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhiboo commented on issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

Posted by GitBox <gi...@apache.org>.
lizhiboo commented on issue #4602:
URL: https://github.com/apache/rocketmq/issues/4602#issuecomment-1183859428

   @vergilyn IMO, depending on consumeExecutor's status is also not correct, consumeExecutor is still running, its status is always changing. if set awaitTerminationMillsWhenShutdown 0, ConsumeExecutor.awaitTermination method blocks until all tasks have completed execution after a shutdown request, or the current thread is interrupted. If user do not call thread.interrupt, it will keep waiting until all tasks have completed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] vergilyn closed issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

Posted by GitBox <gi...@apache.org>.
vergilyn closed issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)
URL: https://github.com/apache/rocketmq/issues/4602


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] djl394922860 commented on issue #4602: Support fetch All consumerExecutor's. (by ConsumeMessageOrderlyService or ConsumeMessageConcurrentlyService create.)

Posted by GitBox <gi...@apache.org>.
djl394922860 commented on issue #4602:
URL: https://github.com/apache/rocketmq/issues/4602#issuecomment-1184005993

   > @lizhiboo sorry, I understood the confusion before.
   > 
   > I want to have been `awaitterminationmswhenshutdown` implementation, as long as `awaitterminationmswhenshutdown` set is long enough.
   > 
   > (In the scenario I described, it's really not necessary to get all of customerExecutor's.)
   > 
   > ```java
   >     public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
   >         // Disable new tasks from being submitted.
   >         executor.shutdown();
   >         try {
   >             // Wait a while for existing tasks to terminate.
   >             if (!executor.awaitTermination(timeout, timeUnit)) {
   >                 executor.shutdownNow();
   >                 // Wait a while for tasks to respond to being cancelled.
   >                 if (!executor.awaitTermination(timeout, timeUnit)) {
   >                     log.warn(String.format("%s didn't terminate!", executor));
   >                 }
   >             }
   >         } catch (InterruptedException ie) {
   >             // (Re-)Cancel if current thread also interrupted.
   >             executor.shutdownNow();
   >             // Preserve interrupt status.
   >             Thread.currentThread().interrupt();
   >         }
   >     }
   > ```
   
   yep , u can set this field as u want


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org