You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Jiri Daněk (JIRA)" <ji...@apache.org> on 2018/05/23 11:30:00 UTC
[jira] [Created] (PROTON-1851) [python] Unable to send messages to
newly connected server after reconnect has happened
Jiri Daněk created PROTON-1851:
----------------------------------
Summary: [python] Unable to send messages to newly connected server after reconnect has happened
Key: PROTON-1851
URL: https://issues.apache.org/jira/browse/PROTON-1851
Project: Qpid Proton
Issue Type: Bug
Components: python-binding
Affects Versions: proton-c-0.22.0
Reporter: Jiri Daněk
If I specify multiple {{container.connect(urls=[...]}}, my client does reconnect fine in case of server failure. What does not seem to work for me is making the client send messages to the new server. I think PROTON-1515 may be related to my issues.
h4. Test case
Can be run as {{python -m unittest proton_tests.engine.ServerTest.testFailover}} in {{qpid-proton/tests/python}}
{code:title=tests/python/proton_tests/engine.py}
class ServerTest(Test):
def testFailover(self):
""" Verify that messages continue to be delivered
(to the new broker) after failover happens"""
server1 = common.TestServer2()
server1.start()
server2 = common.TestServer2()
server2.start()
print("testFailover")
class Program(MessagingHandler):
first = True
def on_start(self, event):
print("sender: on start")
# self.conn = event.container.connect(url="%s:%s" % (server1.host, server1.port)) # , allowed_mechs="ANONYMOUS")
self.conn = event.container.connect(urls=["%s:%s" % (server1.host, server1.port), "%s:%s" % (server2.host, server2.port)])
def on_sendable(self, event):
print("sender: on sendable")
message = Message()
self.sender.send(message)
self.connection = event.connection
def on_connection_opened(self, event):
self.sender = event.container.create_sender(self.conn, "some_address")
print("sender: on connection opened")
p = Program()
c = Container(p)
t = Thread(target=c.run)
t.start()
sleep(1)
server1.stop()
print("server 1 stopped")
sleep(1)
server2.stop()
sleep(1)
c.stop()
print("msgs: ", len(server1.tags), len(server2.tags))
assert len(server1.tags) > 0
assert len(server2.tags) > 0 # this assert fails, len == 0
{code}
{code:title=tests/python/proton_tests/common.py}
class TestServer2(MessagingHandler):
""" Base class for creating test-specific message servers.
"""
def __init__(self, **kwargs):
super(TestServer2, self).__init__()
self.args = kwargs
self.reactor = Container(self)
self.host = "127.0.0.1"
self.port = 0
if "host" in kwargs:
self.host = kwargs["host"]
if "port" in kwargs:
self.port = kwargs["port"]
self.handlers = [CFlowController(10), CHandshaker()]
self.thread = Thread(name="server-thread", target=self.run)
self.thread.daemon = True
self.running = True
self.tags = []
def start(self):
self.thread.start()
def stop(self):
self.running = False
self.reactor.wakeup()
self.acceptor.close()
self.reactor.stop()
self.thread.join()
# Note: all following methods all run under the thread:
def run(self):
self.reactor.run()
def on_start(self, event):
print("on start server")
retry = 0
if self.port == 0:
self.port = str(randint(49152, 65535))
retry = 10
while retry > 0:
try:
# self.acceptor = self.reactor.acceptor(self.host, self.port)
self.acceptor = event.container.listen(Url("%s:%s" % (self.host, self.port)))
break
except IOError as e:
print(e)
self.port = str(randint(49152, 65535))
retry -= 1
assert retry > 0, "No free port for server to listen on!"
def on_delivery(self, event):
"""
:type event: proton.Event
"""
print("on delivery")
event.delivery.settle()
self.tags.append(event.delivery.tag)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org