You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/01 16:42:12 UTC

[GitHub] [pulsar] lucasrpb opened a new issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

lucasrpb opened a new issue #11883:
URL: https://github.com/apache/pulsar/issues/11883


   **Describe the bug**
   
   In the official documentation of Apache Pulsar it says that an exclusive subscription sees a consistent order for a single consumer. Suppose we have multiple consumers each of them with its own exclusive subscription and they are reading from a partitioned topic. As presented in the docs:
   
   "Decisions about routing and subscription modes **can be made separately in most cases**. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.
   
   **There is no difference between partitioned topics and normal topics in terms of how subscription modes work**, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer."
   
   <img src="https://user-images.githubusercontent.com/1669154/131706634-655d2fd8-6ee1-429f-8eda-47728bea3a51.png" width="400" >
   
   Those statements lead us to infer that the readers would get a consistent global ordering among topic partitions. But that is not the case in my tests so far:
   
   <img src="https://user-images.githubusercontent.com/1669154/131708602-554d35ea-e8d3-4b65-8b75-0c2171b5c799.PNG" width="800" >
   
   
   The code I have used to test it (Scala): 
   
   **Producer:**
   
   
         package pulsar
         
             import org.apache.pulsar.client.api.PulsarClient
             import java.util.UUID
         
             object Producer {
         
               def main(args: Array[String]): Unit = {
         
                 val client = PulsarClient.builder()
                   .serviceUrl(SERVICE_URL)
                   .allowTlsInsecureConnection(true)
                   .build()
         
                 val producer = client.newProducer()
                   .topic(TOPIC)
                   .enableBatching(true)
                   //.accessMode(ProducerAccessMode.Exclusive)
                   .create()
         
                 for(i<-0 until 100){
                   val key = UUID.randomUUID().toString.getBytes()
                   //val key = s"Hello-${i}".getBytes()
                   producer.newMessage().orderingKey("k0".getBytes()).value(key).send()
         
                   println(s"produced msg: ${i.toString}")
                 }
         
                 producer.flush()
         
                 producer.close()
                 client.close()
               }
         
             }
   
   
   
   **Consumer:**
   
      ```
    import org.apache.pulsar.client.api.{PulsarClient, SubscriptionInitialPosition, SubscriptionType}
   
       object Consumer  {
       
             def main(args: Array[String]): Unit = {
           
               val client = PulsarClient.builder()
                 .serviceUrl(SERVICE_URL)
                 .allowTlsInsecureConnection(true)
                 .build()
           
               var l1 = Seq.empty[String]
               var l2 = Seq.empty[String]
           
               val c1 = client.newConsumer()
                 .topic(TOPIC)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionName("c1")
                 .subscribe()
           
               // To stop the consuption I put a limit (100) - this limit is known
               while(l1.length < 100){
                 val msg = c1.receive()
                 val str = new String(msg.getData)
           
                 println(s"${Console.MAGENTA_B}$str${Console.RESET}")
           
                 l1 = l1 :+ str
           
                 //c1.acknowledge(msg.getMessageId)
               }
           
               val c2 = client.newConsumer()
                 .topic(TOPIC)
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscriptionName("c2")
                 .subscribe()
           
               while(l2.length < 100){
                 val msg = c2.receive()
                 val str = new String(msg.getData)
           
                 println(s"${Console.GREEN_B}$str${Console.RESET}")
           
                 l2 = l2 :+ str
           
                 //c2.acknowledge(msg.getMessageId)
               }
           
               println()
               println(l1)
               println()
           
               println()
               println(l2)
               println()
           
               try {
                 assert(l1 == l2)
               } finally {
                 c1.close()
                 c2.close()
                 client.close()
               }
             }
       
       }
   ```
   
   
   Am I wrong about it or Pulsar does not support the described behavior I expect? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-1058886579


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lucasrpb commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
lucasrpb commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-912613478


   > > Just to make sure @shibd : Using the combo of SinglePartition config for the producer and ExclusiveSubscription for the consumer would not guarantee ordering?
   > 
   > This can guarantee ordering.
   
   https://stackoverflow.com/questions/68988929/does-pulsar-partitioned-topics-support-global-ordering-when-consuming-from-diffe/69032885#69032885


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] shibd commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
shibd commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-912603337


   > Just to make sure @shibd : Using the combo of SinglePartition config for the producer and ExclusiveSubscription for the consumer would not guarantee ordering?
   
   This can guarantee ordering. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] shibd commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
shibd commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-912354476


   @lucasrpb   
   
   I think pulsar ensures the orderly consumer of one partition-topic. If multiple partition-topics are consumed in subscription mode, the a consistent global order cannot be guaranteed.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] miguelemosreverte commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
miguelemosreverte commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-912549937


   Just to make sure @shibd : Using the combo of SinglePartition config for the producer and ExclusiveSubscription for the consumer would not guarantee ordering?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lucasrpb commented on issue #11883: Exclusive subscription mode for partitioned topics not seeing consistent order among different consumers

Posted by GitBox <gi...@apache.org>.
lucasrpb commented on issue #11883:
URL: https://github.com/apache/pulsar/issues/11883#issuecomment-912617775


   Thanks guys for the explanations! Unfortunately with the current implementation I cannot get the behavior I want! But it will be very easy to achieve it using an approach I have in mind already ! :) Maybe in the near future I can propose a pull request on it! :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org