You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/09/29 06:38:11 UTC

[GitHub] [rocketmq] jzdayz opened a new issue #1499: Consumption is successful, but the offset value changes incorrectly.

jzdayz opened a new issue #1499:  Consumption is successful, but the offset value changes incorrectly.
URL: https://github.com/apache/rocketmq/issues/1499
 
 
   
   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   - What did you do (The steps to reproduce)?
   > If I switch to a consume-group, I may run it multiple times and there may be repeated consumption. In my code, I have all returned success, it should not be like this.I found in the console that after I consumed all the data (printed 10 logs), the value of consumer-offset is not 10
   - What did you expect to see?
   > Can't repeat
   - What did you see instead?
   > Repeated consumption
   2. Please tell us about your environment:
    mac os x 10.13.6
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   #### Consumer
   ```java
   public class Consumer {
   
       public static void main(String[] args) throws InterruptedException, MQClientException {
   
           // Instantiate with specified consumer group name.
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("eee");
   
           // Specify name server addresses.
           consumer.setNamesrvAddr("132.232.39.227:9876");
           
           // Subscribe one more more topics to consume.
           consumer.subscribe("test-comsume", "*");
           // Register callback to execute on arrival of messages fetched from brokers.
           consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
               msgs.forEach(e->{
                   log.info(" receive : {}" , new String(e.getBody()));
               });
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           });
   
           //Launch the consumer instance.
           consumer.start();
   
           System.out.printf("Consumer Started.%n");
       }
   }
   ```
   
   #### SyncProducer
   ```java
   public class SyncProducer {
       public static void main(String[] args) throws Exception {
           //Instantiate with a producer group name.
           DefaultMQProducer producer = new
               DefaultMQProducer("1");
           // Specify name server addresses.
           producer.setNamesrvAddr("132.232.39.227:9876");
           //Launch the instance.
           producer.start();
           for (int i = 0; i < 10; i++) {
               //Create a message instance, specifying topic, tag and message body.
               Message msg = new Message("test-comsume" /* Topic */,
                   "TagA" /* Tag */,
                   ("Hello RocketMQ " +
                       i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
               );
               //Call send message to deliver message to one of brokers.
               SendResult sendResult = producer.send(msg);
               System.out.printf("%s%n", sendResult);
           }
           //Shut down once the producer instance is not longer in use.
           producer.shutdown();
       }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services