You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by Jaskey <gi...@git.apache.org> on 2017/02/16 05:33:34 UTC

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

GitHub user Jaskey opened a pull request:

    https://github.com/apache/incubator-rocketmq/pull/64

    [ROCKETMQ-102] When shutdown(), the persisted offet is not the latest consumed message, which may cause repeated messages.

    Solution: add interface for push consumer to accept await termination time to await consuming.
    
    JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-102

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Jaskey/incubator-rocketmq ROCKETMQ-102-shutdown-await

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rocketmq/pull/64.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #64
    
----
commit 7dd5f353eb926a6af4e9519a69077a463ea75b50
Author: Jaskey <li...@gmail.com>
Date:   2017-02-16T05:32:40Z

    [ROCKETMQ-102] When shutdown(), the persisted offet is not the latest consumed message, which may cause repeated messages.
    Add interface for push consumer to accept await termination time to await consuming,

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    @lizhanhui please refer to the test case and just switch the `shutdown(long milis)` method to `shutdown()` to find out the problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    @zhouxinyu 
    
    Please check the updated pr.
    
    Since shutdownNow needs developer to take respond to interrupts but actually they should not care about this, so I still remain using shutdown , and after timeout , the method will return and remains the very same flow.
    
    BTW, unit test has been updated using  `AssetThat`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by lizhanhui <gi...@git.apache.org>.
Github user lizhanhui commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    Can you provide some test data, say before/after applying this patch, how many duplications are found respectively?
    
    IMHO, we should make the API as concise as possible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r101719487
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---
    @@ -461,7 +461,11 @@ public void start() throws MQClientException {
          */
         @Override
         public void shutdown() {
    -        this.defaultMQPushConsumerImpl.shutdown();
    +        shutdown(0);
    +    }
    +
    +    public void shutdown(long awaitTerminateMillis) {
    --- End diff --
    
    Since `MQPushConsumer` only has the method interface for `shutdown( )`, so we could consider pull `shutdown(long awaitTerminateMillis)` up to the parent interface or make `shutdown(long awaitTerminateMillis)` private and give a default input(non-zero) in `shutdown()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10234328/badge)](https://coveralls.io/builds/10234328)
    
    Coverage increased (+0.4%) to 31.919% when pulling **5667cdfdddf02cd397640347312a490cd53a34d9 on Jaskey:ROCKETMQ-102-shutdown-await** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by lizhanhui <gi...@git.apache.org>.
Github user lizhanhui commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    I know this issue has been brought up in the past. What I suggest here is providing some testing data to consolidate the rationality of this patch. By doing so, you'll find it easier to get developers convinced.
    
    Anyway, thanks a lot for bringing this issue back to attention.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10430718/badge)](https://coveralls.io/builds/10430718)
    
    Changes Unknown when pulling **67cbdc35e908c888976b0e4464ef87cedef88104 on Jaskey:ROCKETMQ-102-shutdown-await** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r103434431
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    add a new java file `ThreadUtils`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    Hi @Jaskey ,
    Thanks for this PR, I added two comments for your reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r102128193
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---
    @@ -461,7 +461,11 @@ public void start() throws MQClientException {
          */
         @Override
         public void shutdown() {
    -        this.defaultMQPushConsumerImpl.shutdown();
    +        shutdown(0);
    +    }
    +
    +    public void shutdown(long awaitTerminateMillis) {
    --- End diff --
    
    Good idea, it's ok for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r102893550
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    May be we need a common method to shutdown executor gracefully, like:
    
    ```
    public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
            executor.shutdown();
    
            try {
                if(!executor.awaitTermination(timeout, timeUnit)) {
                    executor.shutdownNow();
                    if(!executor.awaitTermination(timeout, timeUnit)) {
                        LOG.warn(String.format("%s didn\'t terminate!", new Object[]{executor}));
                    }
                }
            } catch (InterruptedException var5) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
    
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r101723508
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---
    @@ -93,8 +93,24 @@ public void run() {
         }
     
         public void shutdown() {
    +        shutdown(0);
    +    }
    +
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    Ok, I will update the pr accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10195457/badge)](https://coveralls.io/builds/10195457)
    
    Coverage decreased (-0.3%) to 31.215% when pulling **db9e92a87e4da5c9480cd0be29d0ddac9845280d on Jaskey:ROCKETMQ-102-shutdown-await** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r102897821
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    We can, but where do we put this method in, in a Common Util? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r101718137
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---
    @@ -93,8 +93,24 @@ public void run() {
         }
     
         public void shutdown() {
    +        shutdown(0);
    +    }
    +
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    Did you follow our code code guidelines[1] ? Below `if` block is recommended.
    ```
    if {
    }
    ```
    http://rocketmq.incubator.apache.org/docs/code-guidelines/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r104590381
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    `shutdown` is the first choice, `shutdownNow` will be called if timeout.
    
    And `develop` will be merged to `master` in next release, please refer to our new [branching model](http://rocketmq.apache.org/docs/branching-model). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10206021/badge)](https://coveralls.io/builds/10206021)
    
    Coverage increased (+0.5%) to 31.992% when pulling **b178d2fb1c138b8f89d8d670ee39daa96c72a2d1 on Jaskey:ROCKETMQ-102-shutdown-await** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    @lizhanhui 
    
    The problem exists for long, please review an old issue.
    https://github.com/alibaba/RocketMQ/issues/367
    
    The number of duplicated messages are depending on how many messages are consuming in the thread pool or in the pending queue. This should be very easy to meet when consumptions are takes long and with massive accumulation.
    
    The problem should be addressed since rocketmq should prevend duplication as much as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r102893082
  
    --- Diff: client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java ---
    @@ -52,6 +56,7 @@
     import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
     import org.apache.rocketmq.remoting.exception.RemotingException;
     import org.junit.After;
    +import org.junit.Assert;
    --- End diff --
    
    Hi, let's unify the assert tool and use `org.assertj.core.api.Assertions.assertThat`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    Repeated messages is a big problem for a message queue, and this is a known issue. When can this pr be reviewed and merged?
    @lizhanhui @zhouxinyu @vongosling 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10465726/badge)](https://coveralls.io/builds/10465726)
    
    Coverage increased (+0.3%) to 31.243% when pulling **3da9a7983d2171f09c24a930d367b17b227fad86 on Jaskey:ROCKETMQ-102-shutdown-await** into **e3f4251c91a73f4e51732bcb1690554ac5fb3096 on apache:develop**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r101723621
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---
    @@ -461,7 +461,11 @@ public void start() throws MQClientException {
          */
         @Override
         public void shutdown() {
    -        this.defaultMQPushConsumerImpl.shutdown();
    +        shutdown(0);
    +    }
    +
    +    public void shutdown(long awaitTerminateMillis) {
    --- End diff --
    
    @zhouxinyu  
    
    I know and agree to your concern, actually, as I my previous comment, maybe we could add an configuration for awaitMills, what do you think ? The default value could be 0 to keep the same behavior
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10189206/badge)](https://coveralls.io/builds/10189206)
    
    Coverage decreased (-0.3%) to 31.215% when pulling **db9e92a87e4da5c9480cd0be29d0ddac9845280d on Jaskey:ROCKETMQ-102-shutdown-await** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r104399253
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    Hi @Jaskey ,
    
    I added a ThreadUtils, please refer to [here](https://github.com/apache/incubator-rocketmq/commit/e3f4251c91a73f4e51732bcb1690554ac5fb3096).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #64: [ROCKETMQ-102] When shutdown(), the per...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/64#discussion_r104592589
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---
    @@ -92,10 +92,22 @@ public void run() {
             }
         }
     
    -    public void shutdown() {
    +    @Override
    +    public void shutdown(long awaitTerminateMillis) {
             this.stopped = true;
             this.scheduledExecutorService.shutdown();
             this.consumeExecutor.shutdown();
    +        //await to consume
    +        if (awaitTerminateMillis > 0) {
    +            try {
    +                this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
    +                if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
    --- End diff --
    
    @zhouxinyu In my option, rocketmq should have no right to interrupt what dev's business are doing, we may be doing some time-cost job which are doing transcation or inserting database, we should leave the task running if executor is still not terminated.
    
    Besides, since the old version has not termination millis, so 0 of termination millis is the default  behavior , shutdown now will cost task being interrupt/cancel immediately which is not proper in my opinion.
    
    I have updated the pr , but still using my old method, please review and let's discuss more about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    
    [![Coverage Status](https://coveralls.io/builds/10228800/badge)](https://coveralls.io/builds/10228800)
    
    Coverage increased (+0.2%) to 31.691% when pulling **d714c6c7aca67ab1de715e99112f551f00d60682 on Jaskey:ROCKETMQ-102-shutdown-await** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    @lizhanhui @zhouxinyu @vintagewang 
    
    What do you think my proposol of adding a field called `awaitMilisWhenShutdown` in `DefaultPushConsumer`, which will not need to add a `shutdown(long awaitMilis)` interface but one more config field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #64: [ROCKETMQ-102] When shutdown(), the persisted ...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/64
  
    @lizhanhui @zhouxinyu 
    
    please review the updated pr, which remains the same interface of push consumer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---