You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/22 13:19:14 UTC

[GitHub] [pulsar] BewareMyPower opened a new issue, #19030: [Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment

BewareMyPower opened a new issue, #19030:
URL: https://github.com/apache/pulsar/issues/19030

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar.
   
   
   ### Version
   
   OS: Ubuntu 20.04
   Pulsar: master (41edd2e)
   
   ### Minimal reproduce step
   
   Add a unit test that does these things:
   1. Create a producer to send N messages in the same batch.
   2. Create a consumer to receive these messages and store the `MessageId` objects, which share the same ledger id, entry id, batch size, only the batch indexes are different.
   3. Convert these `MessageId` objects by a serialization (`MessageId#toByteArray`) and a deserialization (`MessageId#fromByteArray`).
   4. Acknowledge these `MessageId` objects.
   5. Restart the consumer, it still receives the 1st message.
   
   ```java
       @Test
       public void testSerialization() throws Exception {
           var topic = "test-serialization-origin";
           @Cleanup var producer = pulsarClient.newProducer(Schema.INT32)
                   .topic(topic)
                   .batchingMaxMessages(100)
                   .batchingMaxPublishDelay(1, TimeUnit.DAYS)
                   .create();
           @Cleanup var consumer = pulsarClient.newConsumer(Schema.INT32)
                   .topic(topic)
                   .subscriptionName("sub")
                   .isAckReceiptEnabled(true)
                   .subscribe();
   
           final var numMessages = 10;
           for (int i = 0; i < numMessages; i++) {
               producer.sendAsync(i);
           }
           producer.flush();
           final var msgIds = new ArrayList<MessageId>();
           for (int i = 0; i < numMessages; i++) {
               msgIds.add(consumer.receive().getMessageId());
           }
           for (int i = 1; i < numMessages; i++) {
               final var lhs = (BatchMessageIdImpl) msgIds.get(0);
               final var rhs = (BatchMessageIdImpl) msgIds.get(i);
               assertEquals(lhs.getLedgerId(), rhs.getLedgerId());
               assertEquals(lhs.getEntryId(), rhs.getEntryId());
               assertEquals(lhs.getBatchSize(), rhs.getBatchSize());
               assertEquals(lhs.getBatchSize(), numMessages);
           }
   
           var deserializedMsgIds = new ArrayList<MessageId>();
           for (var msgId : msgIds) {
               var deserialized = MessageId.fromByteArray(msgId.toByteArray());
               assertTrue(deserialized instanceof BatchMessageIdImpl);
               deserializedMsgIds.add(deserialized);
           }
           for (var msgId : deserializedMsgIds) {
               consumer.acknowledge(msgId);
           }
           consumer.close();
   
           consumer = pulsarClient.newConsumer(Schema.INT32)
                   .topic(topic)
                   .subscriptionName("sub")
                   .isAckReceiptEnabled(true)
                   .subscribe();
           final var msg = consumer.receive(3, TimeUnit.SECONDS);
           assertNotNull(msg);
           assertEquals(msg.getValue(), 0);
       }
   ```
   
   ### What did you expect to see?
   
   The restarted consumer should receive nothing.
   
   ### What did you see instead?
   
   The restarted consumer received the 1st message.
   
   ### Anything else?
   
   The root cause is from https://github.com/apache/pulsar/pull/1424, which make all `MessageId` instances in the same batch share the same `BatchMessageAcker` object. However, when `MessageId` instances are created from a deserialization. It's impossible to make them share the same `BatchMessageAcker`.
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #19030: [Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment

Posted by github-actions.
github-actions[bot] commented on issue #19030:
URL: https://github.com/apache/pulsar/issues/19030#issuecomment-1399384572

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on issue #19030: [Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #19030:
URL: https://github.com/apache/pulsar/issues/19030#issuecomment-1362843527

   The deserialization logic in https://github.com/apache/pulsar/pull/9855 is meaningless. https://github.com/apache/pulsar/blob/f85d591549ed0145c4a1e654f9bb204a18b1a6e7/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java#L99-L105
   
   Creating a independent `BatchMessageAcker` helps nothing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org