You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/11/14 11:00:50 UTC
[GitHub] [pulsar] candlerb opened a new issue #5664: Python API: flush not
blocking, send_async leaks memory
candlerb opened a new issue #5664: Python API: flush not blocking, send_async leaks memory
URL: https://github.com/apache/pulsar/issues/5664
**Describe the bug**
1. `producer.flush()` does not wait until all messages have been published
2. `producer.send_async()` appears to leak memory
**To Reproduce**
This program reproduces both behaviours.
```
from collections import defaultdict
import gc
import os
import pulsar
import subprocess
import time
NUM_MESSAGES = 500
MESSAGE_SIZE = 1_000_000
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('temp', producer_name='fred')
# Ensure that stdout is flushed on each line
old_print = print
def print(*a, **b):
old_print(*a, flush=True, **b)
sent = 0
bytes = 0
results = defaultdict(lambda: 0)
def ack(res, msg):
global sent, bytes
sent += 1
bytes += len(msg.data())
results[str(res)] += 1
for i in range(NUM_MESSAGES):
data = b"x" * MESSAGE_SIZE
producer.send_async(data, callback=ack)
print("Flushing...")
producer.flush()
print("Flush complete")
for i in range(600):
print("Sent: %d messages %d bytes" % (sent, bytes))
if sent == NUM_MESSAGES:
break
time.sleep(0.1)
else:
print("Never got all the messages!")
print("Results: %r" % dict(results))
gc.collect()
subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
print("Sleeping...")
time.sleep(5)
subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
producer.close()
client.close()
```
Output:
```
Flushing...
Flush complete
Sent: 150 messages 150000000 bytes
Sent: 172 messages 172000000 bytes
Sent: 180 messages 180000000 bytes
Sent: 197 messages 197000000 bytes
Sent: 215 messages 215000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 226 messages 226000000 bytes
Sent: 228 messages 228000000 bytes
Sent: 242 messages 242000000 bytes
Sent: 258 messages 258000000 bytes
Sent: 268 messages 268000000 bytes
Sent: 269 messages 269000000 bytes
Sent: 273 messages 273000000 bytes
Sent: 279 messages 279000000 bytes
Sent: 314 messages 314000000 bytes
Sent: 331 messages 331000000 bytes
Sent: 331 messages 331000000 bytes
Sent: 341 messages 341000000 bytes
Sent: 356 messages 356000000 bytes
Sent: 367 messages 367000000 bytes
Sent: 380 messages 380000000 bytes
Sent: 396 messages 396000000 bytes
Sent: 426 messages 426000000 bytes
Sent: 438 messages 438000000 bytes
Sent: 452 messages 452000000 bytes
Sent: 470 messages 470000000 bytes
Sent: 479 messages 479000000 bytes
Sent: 480 messages 480000000 bytes
Sent: 500 messages 500000000 bytes
Results: {'Ok': 500}
VSZ RSS COMMAND
591436 373396 python3
Sleeping...
VSZ RSS COMMAND
591436 373396 python3
```
**Expected behavior**
1. The [documentation](https://github.com/apache/pulsar/blob/master/pulsar-client-cpp/python/pulsar/__init__.py#L892) for "flush" says:
> Flush all the messages buffered in the client and wait until all messages have been successfully persisted
Therefore I was expecting that by the time `producer.flush()` returns, all 500 messages would have been delivered. This is clearly not happening.
2. There appears to be a memory leak: once the callback has been invoked, the message buffer should be freed.
Increasing to 1000 messages (still at 1,000,000 bytes each) gives a larger memory footprint:
```
Results: {'Ok': 1000}
VSZ RSS COMMAND
881568 663396 python3
Sleeping...
VSZ RSS COMMAND
881568 663396 python3
```
However, if you change
```
producer.send_async(data, callback=ack)
```
to
```
res = producer.send(data)
ack(res, data)
```
and adjust the callback to use `len(msg)` instead of `len(msg.data())`, then the memory leak goes away:
```
Flushing...
Flush complete
Sent: 500 messages 500000000 bytes
Results: {'None': 500}
VSZ RSS COMMAND
249544 30484 python3
Sleeping...
VSZ RSS COMMAND
249544 30484 python3
```
**Other info**
For comparison, here is a non-pulsar, pure python program which allocates callbacks but doesn't leak.
```
import gc
import os
import subprocess
callbacks = []
total = 0
def make_callback(buf):
def mycb():
global total
total += len(buf)
callbacks.append(mycb)
for i in range(500):
buf = bytes([i % 256]) * 1_000_000
make_callback(buf)
print("Waiting callbacks")
subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
while callbacks:
cb = callbacks.pop(0)
cb()
print(total)
print("After callbacks run and removed")
subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
gc.collect()
print("After gc")
subprocess.run(["ps","-eo","vsz,rss,comm","-q",str(os.getpid())])
```
Output:
```
Waiting callbacks
VSZ RSS COMMAND
521180 500492 python3
500000000
After callbacks run and removed
VSZ RSS COMMAND
32160 11472 python3
After gc
VSZ RSS COMMAND
32160 11472 python3
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services