You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/10/21 03:50:05 UTC

[GitHub] [rocketmq] killGC opened a new issue #2367: consumer only consume part 'delay message'

killGC opened a new issue #2367:
URL: https://github.com/apache/rocketmq/issues/2367


   The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
   
   Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
   
   Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as the following:
   
   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   ```java
   public class MessageTest2 {
       @Test
       public void test1() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
           DefaultMQProducer mqProducer = new DefaultMQProducer("produce_hjc9");
   
           mqProducer.setNamesrvAddr("192.168.201.215:9876;192.168.201.216:9876");
           //mqProducer.setNamesrvAddr("192.168.66.223:9876");
   
           mqProducer.start();
   
   
           TradeMessageBO tradeMessageBO = new TradeMessageBO();
           tradeMessageBO.setOrderInfoId(123L);
           tradeMessageBO.setPayOrderId(234L);
           Message message = new Message("testDelay_hjc9", TradeMessageEnum.SAVE_ORDER_DELAY_30.getTag(), JSON.toJSONBytes(tradeMessageBO));
   
           for (int i =0; i<20; i++) {
   
               Thread.sleep(1000);
               message.setDelayTimeLevel(2);
               SendResult send = mqProducer.send(message);
               System.out.println(send);
           }
   
   
   
       }
   
       public static void main(String[] args) throws MQClientException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("resumer_hjc9");
           consumer.setNamesrvAddr("192.168.201.215:9876;192.168.201.216:9876");
           //consumer.setNamesrvAddr("192.168.66.223:9876");
   
           consumer.subscribe("testDelay_hjc9","SAVE_ORDER_DELAY_30");
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
                   System.out.println(list.get(0));
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
   
           consumer.start();
   
       }
   }
   
   MessageTest2.main log:
   
   MessageExt [brokerName=broker-a, queueId=1, storeSize=288, queueOffset=5, sysFlag=0, bornTimestamp=1603244932906, bornHost=/192.168.66.197:64800, storeTimestamp=1603244389249, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA0A9E, commitLogOffset=212470430, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=6, CONSUME_START_TIME=1603244937924, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=1}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=2, storeSize=288, queueOffset=5, sysFlag=0, bornTimestamp=1603244933918, bornHost=/192.168.66.197:64800, storeTimestamp=1603244390255, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA0BBE, commitLogOffset=212470718, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=6, CONSUME_START_TIME=1603244938924, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=2}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=3, storeSize=288, queueOffset=5, sysFlag=0, bornTimestamp=1603244934922, bornHost=/192.168.66.197:64800, storeTimestamp=1603244391259, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA0CDE, commitLogOffset=212471006, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=6, CONSUME_START_TIME=1603244939929, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=3}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=0, storeSize=288, queueOffset=5, sysFlag=0, bornTimestamp=1603244939943, bornHost=/192.168.66.197:64800, storeTimestamp=1603244396279, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA1292, commitLogOffset=212472466, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=6, CONSUME_START_TIME=1603244944950, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=0}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=1, storeSize=288, queueOffset=6, sysFlag=0, bornTimestamp=1603244940947, bornHost=/192.168.66.197:64800, storeTimestamp=1603244397283, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA13B2, commitLogOffset=212472754, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=7, CONSUME_START_TIME=1603244945953, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=1}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=2, storeSize=288, queueOffset=6, sysFlag=0, bornTimestamp=1603244941951, bornHost=/192.168.66.197:64800, storeTimestamp=1603244398287, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA14D2, commitLogOffset=212473042, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=7, CONSUME_START_TIME=1603244946958, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=2}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=3, storeSize=288, queueOffset=6, sysFlag=0, bornTimestamp=1603244942955, bornHost=/192.168.66.197:64800, storeTimestamp=1603244399290, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA15F2, commitLogOffset=212473330, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=7, CONSUME_START_TIME=1603244947961, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=3}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=0, storeSize=288, queueOffset=6, sysFlag=0, bornTimestamp=1603244947977, bornHost=/192.168.66.197:64800, storeTimestamp=1603244404315, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA1BA6, commitLogOffset=212474790, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=7, CONSUME_START_TIME=1603244952985, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=0}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=1, storeSize=288, queueOffset=7, sysFlag=0, bornTimestamp=1603244948986, bornHost=/192.168.66.197:64800, storeTimestamp=1603244405322, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA1CC6, commitLogOffset=212475078, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=8, CONSUME_START_TIME=1603244953996, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=1}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=2, storeSize=288, queueOffset=7, sysFlag=0, bornTimestamp=1603244949990, bornHost=/192.168.66.197:64800, storeTimestamp=1603244406327, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA1DE6, commitLogOffset=212475366, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=8, CONSUME_START_TIME=1603244954997, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=2}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   MessageExt [brokerName=broker-a, queueId=3, storeSize=288, queueOffset=7, sysFlag=0, bornTimestamp=1603244950995, bornHost=/192.168.66.197:64800, storeTimestamp=1603244407331, storeHost=/192.168.201.215:10911, msgId=C0A8C9D700002A9F000000000CAA1F06, commitLogOffset=212475654, bodyCRC=2119923294, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='testDelay_hjc9', flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=testDelay_hjc9, MAX_OFFSET=8, CONSUME_START_TIME=1603244956004, UNIQ_KEY=C0A842C57BAC18B4AAC2691A53290000, CLUSTER=rocketmq-cluster, WAIT=true, DELAY=2, TAGS=SAVE_ORDER_DELAY_30, REAL_QID=3}, body=[123, 34, 111, 114, 100, 101, 114, 73, 110, 102, 111, 73, 100, 34, 58, 49, 50, 51, 44, 34, 112, 97, 121, 79, 114, 100, 101, 114, 73, 100, 34, 58, 50, 51, 52, 125], transactionId='null'}]
   
   MessageTest2.test1 log:
   
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA072F, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=1], queueOffset=188]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA0854, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=2], queueOffset=189]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA0979, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=3], queueOffset=190]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB3BD, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=0], queueOffset=195]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB4E2, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=1], queueOffset=196]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB607, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=2], queueOffset=197]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB72C, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=3], queueOffset=198]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA0DFE, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=0], queueOffset=191]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA0F23, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=1], queueOffset=192]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA1048, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=2], queueOffset=193]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA116D, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=3], queueOffset=194]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB851, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=0], queueOffset=199]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015AB976, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=1], queueOffset=200]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015ABA9B, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=2], queueOffset=201]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015ABBC0, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=3], queueOffset=202]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA1712, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=0], queueOffset=195]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA1837, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=1], queueOffset=196]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA195C, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=2], queueOffset=197]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D700002A9F000000000CAA1A81, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-a, queueId=3], queueOffset=198]
   SendResult [sendStatus=SEND_OK, msgId=C0A842C57BAC18B4AAC2691A53290000, offsetMsgId=C0A8C9D800002A9F00000000015ABCE5, messageQueue=MessageQueue [topic=testDelay_hjc9, brokerName=broker-b, queueId=0], queueOffset=203]
   ```java
   
   - What did you do (The steps to reproduce)?
   
   first start MessageTest2.main to await consume message
   second MessageTest2.test1 to send delay message
   
   - What did you expect to see?
   MessageTest2.main consume all delay message from MessageTest2.test1
   
   - What did you see instead?
   MessageTest2.main consume part delay message from MessageTest2.test1
   
   2. Please tell us about your environment:
   rocketmq version 4.7.1
   
   use rocketmq master/slave synch mode
   Broker | NO. | Address | Version | Produce Massage TPS | Consumer Massage TPS | Yesterday Produce Count | Yesterday Consume Count | Today Produce Count | Today Consume Count | Operation
   -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | --
   broker-b | 0(master) | ip2:10911 | V4_7_1 | 0.00 | 0.00 | 155 | 376 | 1423 | 848 | STATUS CONFIG
   broker-b | 1(slave) | ip1:10950 | V4_7_1 | 0.00 | 0.00 | 155 | 0 | 1423 | 0 | STATUS CONFIG
   broker-a | 0(master) | ip1:10911 | V4_7_1 | 0.00 | 0.00 | 391 | 886 | 2195 | 1173 | STATUS CONFIG
   broker-a | 1(slave) | ip2:10950 | V4_7_1 | 0.00 | 0.00 | 391 | 0 | 2195 | 0 | STATUS
   
   
   
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   
   **FEATURE REQUEST**
   
   1. Please describe the feature you are requesting.
   
   2. Provide any additional detail on your proposed use case for this feature.
   
   2. Indicate the importance of this issue to you (blocker, must-have, should-have, nice-to-have). Are you currently using any workarounds to address this issue?
   
   4. If there are some sub-tasks using -[] for each subtask and create a corresponding issue to map to the sub task:
   
   - [sub-task1-issue-number](example_sub_issue1_link_here): sub-task1 description here, 
   - [sub-task2-issue-number](example_sub_issue2_link_here): sub-task2 description here,
   - ...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jaskey commented on issue #2367: consumer only consume part 'delay message'

Posted by GitBox <gi...@apache.org>.
Jaskey commented on issue #2367:
URL: https://github.com/apache/rocketmq/issues/2367#issuecomment-734045115


   Did you set batch consume size?  Since I saw you only consume one message in your listener callbac
   
                ``` System.out.println(list.get(0));```
   
   
   1. please replace to 
           
   
            for () {
                 System.out.println(list.get(0));
            }
   
   2. check you have have another consumer started
   
   
   3. check your queue size. Is your write queue size is not equals to read queue size?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org