You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/18 09:46:00 UTC

[GitHub] [pulsar] swjz opened a new issue #14372: `acknowledge_cumulative()` does not work with a consumer subscribed to multiple topics.

swjz opened a new issue #14372:
URL: https://github.com/apache/pulsar/issues/14372


   **Describe the bug**
   On `pulsar-client==2.9.1` (python), `acknowledge_cumulative()` does not work with a consumer subscribed to multiple topics. 
   
   **To Reproduce**
   Set up a pulsar server on `localhost:6650`. Create two topics `topic1` and `topic2`. Make sure each topic has at least one message in it so we can consume them later.
   
   Run the following code to consume a message.
   
   ```python
   import pulsar
   from _pulsar import InitialPosition
   
   client = pulsar.Client(service_url='pulsar://localhost:6650')
   
   consumer = client.subscribe(['topic1', 'topic2'], subscription_name='test', initial_position=InitialPosition.Earliest)
   msg = consumer.receive()
   
   consumer.acknowledge_cumulative(msg)
   ```
   
   The last line `acknowledge_cumulative()` throws `RuntimeError: bad_function_call`.
   
   
   **Expected behavior**
   No error should be raised.
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: Ubuntu 20.04.3 LTS
    - Python 3.8.10
    - `pulsar-client==2.9.1`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] swjz commented on issue #14372: `acknowledge_cumulative()` does not work with a consumer subscribed to multiple topics.

Posted by GitBox <gi...@apache.org>.
swjz commented on issue #14372:
URL: https://github.com/apache/pulsar/issues/14372#issuecomment-1044994429


   Thanks a lot Yunze! I guess the python API should also raise an error directly when a consumer tries to cumulative ACK multiple topics. In this case the C++ consumer returned `ResultOperationNotSupported` but the python consumer was not aware of this specific error, which confused me.
   
   Also thanks for fixing the close process! I thought it was related but didn't figure out the root cause. I'll close my PR then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on issue #14372: `acknowledge_cumulative()` does not work with a consumer subscribed to multiple topics.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on issue #14372:
URL: https://github.com/apache/pulsar/issues/14372#issuecomment-1044415183


   > The last line acknowledge_cumulative() throws RuntimeError: bad_function_call.
   
   C++ consumer doesn't support cumulative ACK for multiple topics, see https://github.com/apache/pulsar/blob/0facd24e8eec2df56f6f241ce2cf87eb98590a7e/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc#L558-L560
   
   I tried to add the support long days ago, see #6796, but the feature seems to be controversial.
   
   I modified your script to
   
   ```python
   import pulsar
   from _pulsar import InitialPosition
   
   client = pulsar.Client(service_url='pulsar://localhost:6650')
   
   consumer = client.subscribe(['topic1', 'topic2'], subscription_name='test', initial_position=InitialPosition.Earliest)
   msg = consumer.receive()
   
   try:
       consumer.acknowledge_cumulative(msg)
   except Exception as e:
       print("ack failed: {}".format(e))
   
   client.close()
   ```
   
   and produced one message to `topic1`, the output is:
   
   ```
   2022-02-18 20:00:22.871 INFO  [0x1119d1600] ClientConnection:182 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
   2022-02-18 20:00:22.871 INFO  [0x1119d1600] ConnectionPool:96 | Created connection for pulsar://localhost:6650
   2022-02-18 20:00:22.873 INFO  [0x70000325b000] ClientConnection:368 | [127.0.0.1:50215 -> 127.0.0.1:6650] Connected to broker
   2022-02-18 20:00:22.876 INFO  [0x70000325b000] HandlerBase:64 | [persistent://public/default/topic1, test, 0] Getting connection from pool
   2022-02-18 20:00:22.876 INFO  [0x70000325b000] HandlerBase:64 | [persistent://public/default/topic2, test, 1] Getting connection from pool
   2022-02-18 20:00:22.878 INFO  [0x70000325b000] ConsumerImpl:224 | [persistent://public/default/topic1, test, 0] Created consumer on broker [127.0.0.1:50215 -> 127.0.0.1:6650] 
   2022-02-18 20:00:22.879 INFO  [0x70000325b000] ConsumerImpl:224 | [persistent://public/default/topic2, test, 1] Created consumer on broker [127.0.0.1:50215 -> 127.0.0.1:6650] 
   2022-02-18 20:00:22.879 INFO  [0x70000325b000] MultiTopicsConsumerImpl:95 | Successfully Subscribed to Topics
   ack failed: std::exception
   2022-02-18 20:00:22.901 INFO  [0x1119d1600] ClientImpl:496 | Closing Pulsar client with 0 producers and 1 consumers
   2022-02-18 20:00:22.901 INFO  [0x1119d1600] ConsumerImpl:999 | [persistent://public/default/topic1, test, 0] Closing consumer for topic persistent://public/default/topic1
   2022-02-18 20:00:22.901 INFO  [0x1119d1600] ConsumerImpl:999 | [persistent://public/default/topic2, test, 1] Closing consumer for topic persistent://public/default/topic2
   2022-02-18 20:00:22.902 INFO  [0x70000325b000] ConsumerImpl:1055 | [persistent://public/default/topic1, test, 0] Closed consumer 0
   2022-02-18 20:00:22.903 INFO  [0x70000325b000] ConsumerImpl:1055 | [persistent://public/default/topic2, test, 1] Closed consumer 1
   2022-02-18 20:00:22.903 INFO  [0x70000325b000] ClientConnection:1548 | [127.0.0.1:50215 -> 127.0.0.1:6650] Connection closed
   2022-02-18 20:00:22.903 INFO  [0x70000325b000] ClientConnection:256 | [127.0.0.1:50215 -> 127.0.0.1:6650] Destroyed connection
   ```
   
   It looks like this issue is related to the close process, not the cumulative acknowledgement.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] swjz commented on issue #14372: `acknowledge_cumulative()` does not work with a consumer subscribed to multiple topics.

Posted by GitBox <gi...@apache.org>.
swjz commented on issue #14372:
URL: https://github.com/apache/pulsar/issues/14372#issuecomment-1044235326


   It looks like each consumer iterator is erased https://github.com/apache/pulsar/blob/773f9197cfb83092fb6ee2bf1b9ec38143f3c8b0/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc#L393-L396
   within the for loop
   https://github.com/apache/pulsar/blob/773f9197cfb83092fb6ee2bf1b9ec38143f3c8b0/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc#L377-L385
   so `consumer++` throws a segfault.
   
   Please correct me if I'm wrong since I'm new to this project. I'll also try to submit a fix soon.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org