You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/11/14 11:00:50 UTC

[GitHub] [pulsar] candlerb opened a new issue #5664: Python API: flush not blocking, send_async leaks memory

candlerb opened a new issue #5664: Python API: flush not blocking, send_async leaks memory
URL: https://github.com/apache/pulsar/issues/5664
 
 
   **Describe the bug**
   
   1. `producer.flush()` does not wait until all messages have been published
   2. `producer.send_async()` appears to leak memory
   
   **To Reproduce**
   
   This program reproduces both behaviours.
   
   ```
   from collections import defaultdict
   import gc
   import os
   import pulsar
   import subprocess
   import time
   
   NUM_MESSAGES = 500
   MESSAGE_SIZE = 1_000_000
   
   client = pulsar.Client('pulsar://localhost:6650')
   
   producer = client.create_producer('temp', producer_name='fred')
   
   # Ensure that stdout is flushed on each line
   old_print = print
   def print(*a, **b):
       old_print(*a, flush=True, **b)
   
   sent = 0
   bytes = 0
   results = defaultdict(lambda: 0)
   def ack(res, msg):
       global sent, bytes
       sent += 1
       bytes += len(msg.data())
       results[str(res)] += 1
   
   for i in range(NUM_MESSAGES):
       data = b"x" * MESSAGE_SIZE
       producer.send_async(data, callback=ack)
   
   print("Flushing...")
   producer.flush()
   print("Flush complete")
   for i in range(600):
       print("Sent: %d messages %d bytes" % (sent, bytes))
       if sent == NUM_MESSAGES:
           break
       time.sleep(0.1)
   else:
       print("Never got all the messages!")
   print("Results: %r" % dict(results))
   
   gc.collect()
   subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
   print("Sleeping...")
   time.sleep(5)
   subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
   
   producer.close()
   client.close()
   ```
   
   Output:
   
   ```
   Flushing...
   Flush complete
   Sent: 150 messages 150000000 bytes
   Sent: 172 messages 172000000 bytes
   Sent: 180 messages 180000000 bytes
   Sent: 197 messages 197000000 bytes
   Sent: 215 messages 215000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 226 messages 226000000 bytes
   Sent: 228 messages 228000000 bytes
   Sent: 242 messages 242000000 bytes
   Sent: 258 messages 258000000 bytes
   Sent: 268 messages 268000000 bytes
   Sent: 269 messages 269000000 bytes
   Sent: 273 messages 273000000 bytes
   Sent: 279 messages 279000000 bytes
   Sent: 314 messages 314000000 bytes
   Sent: 331 messages 331000000 bytes
   Sent: 331 messages 331000000 bytes
   Sent: 341 messages 341000000 bytes
   Sent: 356 messages 356000000 bytes
   Sent: 367 messages 367000000 bytes
   Sent: 380 messages 380000000 bytes
   Sent: 396 messages 396000000 bytes
   Sent: 426 messages 426000000 bytes
   Sent: 438 messages 438000000 bytes
   Sent: 452 messages 452000000 bytes
   Sent: 470 messages 470000000 bytes
   Sent: 479 messages 479000000 bytes
   Sent: 480 messages 480000000 bytes
   Sent: 500 messages 500000000 bytes
   Results: {'Ok': 500}
      VSZ   RSS COMMAND
   591436 373396 python3
   Sleeping...
      VSZ   RSS COMMAND
   591436 373396 python3
   ```
   
   **Expected behavior**
   
   1. The [documentation](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py#L892) for "flush" says:
   
       > Flush all the messages buffered in the client and wait until all messages have been successfully persisted
   
       Therefore I was expecting that by the time `producer.flush()` returns, all 500 messages would have been delivered.  This is clearly not happening.
   
   2. There appears to be a memory leak: once the callback has been invoked, the message buffer should be freed.
   
   Increasing to 1000 messages (still at 1,000,000 bytes each) gives a larger memory footprint:
   
   ```
   Results: {'Ok': 1000}
      VSZ   RSS COMMAND
   881568 663396 python3
   Sleeping...
      VSZ   RSS COMMAND
   881568 663396 python3
   ```
   
   However, if you change
   
   ```
   producer.send_async(data, callback=ack)
   ```
   
   to
   
   ```
   res = producer.send(data)
   ack(res, data)
   ```
   
   and adjust the callback to use `len(msg)` instead of `len(msg.data())`, then the memory leak goes away:
   
   ```
   Flushing...
   Flush complete
   Sent: 500 messages 500000000 bytes
   Results: {'None': 500}
      VSZ   RSS COMMAND
   249544 30484 python3
   Sleeping...
      VSZ   RSS COMMAND
   249544 30484 python3
   ```
   
   **Other info**
   
   For comparison, here is a non-pulsar, pure python program which allocates callbacks but doesn't leak.
   
   ```
   import gc
   import os
   import subprocess
   
   callbacks = []
   total = 0
   
   def make_callback(buf):
       def mycb():
           global total
           total += len(buf)
       callbacks.append(mycb)
   
   for i in range(500):
       buf = bytes([i % 256]) * 1_000_000
       make_callback(buf)
   
   print("Waiting callbacks")
   subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
   
   while callbacks:
       cb = callbacks.pop(0)
       cb()
   print(total)
   print("After callbacks run and removed")
   subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
   gc.collect()
   print("After gc")
   subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
   ```
   
   Output:
   
   ```
   Waiting callbacks
      VSZ   RSS COMMAND
   521180 500492 python3
   500000000
   After callbacks run and removed
      VSZ   RSS COMMAND
    32160 11472 python3
   After gc
      VSZ   RSS COMMAND
    32160 11472 python3
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services