You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/11 15:28:54 UTC

[GitHub] [pulsar] reswqa opened a new issue #8925: messages may be out of order in transactions

reswqa opened a new issue #8925:
URL: https://github.com/apache/pulsar/issues/8925


   **Describe the bug**
   With the transaction on, since the first message of each partition has to go to the TC to add the relevant partition metadata, it may cause the later messages to be sent first since the method is asynchronous.
   
   **To Reproduce**
   ```
           Transaction txn = pulsarClient
                   .newTransaction()
                   .withTransactionTimeout(5, TimeUnit.SECONDS)
                   .build().get();
           log.info("init transaction {}.", txn);
   
           @Cleanup
           Producer<Integer> producer = pulsarClient
                   .newProducer(Schema.INT32)
                   .sendTimeout(0, TimeUnit.SECONDS)
                   .topic(PRODUCE_COMMIT_TOPIC)
                   .create();
   
           int incomingMessageCnt = 20;
           List<Integer> expected = new ArrayList<>();
           List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
           for(int i = 0; i<incomingMessageCnt; i++){
               CompletableFuture<MessageId> future =
                       producer.newMessage(txn).value(i).sendAsync();
               expected.add(i);
               futureList.add(future);
           }
           for(CompletableFuture<MessageId> future : futureList){
               future.get();
           }
           txn.commit().get();
   
           @Cleanup
           Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
                   .topic(PRODUCE_COMMIT_TOPIC)
                   .subscriptionName("messageOrderTest")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .enableBatchIndexAcknowledgment(true)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
           List<Integer> actual = new ArrayList<>();
           while(true) {
               Message<Integer> message = consumer.receive(10, TimeUnit.SECONDS);
               actual.add(message.getValue());
               if(actual.size() == expected.size()){
                   Assert.assertEquals(actual, expected);
                   break;
               }
           }
           log.info("finish test messageOrderTest");
   ```
   The above code will report an error when asserting because the first message is out of order
   **Expected behavior**
   The above code executes correctly
   
   **Desktop (please complete the following information):**
    - OS: Mac OS 11.0.1
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org