You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Jiri Daněk (JIRA)" <ji...@apache.org> on 2018/05/23 11:30:00 UTC

[jira] [Created] (PROTON-1851) [python] Unable to send messages to newly connected server after reconnect has happened

Jiri Daněk created PROTON-1851:
----------------------------------

             Summary: [python] Unable to send messages to newly connected server after reconnect has happened
                 Key: PROTON-1851
                 URL: https://issues.apache.org/jira/browse/PROTON-1851
             Project: Qpid Proton
          Issue Type: Bug
          Components: python-binding
    Affects Versions: proton-c-0.22.0
            Reporter: Jiri Daněk


If I specify multiple {{container.connect(urls=[...]}}, my client does reconnect fine in case of server failure. What does not seem to work for me is making the client send messages to the new server. I think PROTON-1515 may be related to my issues.

h4. Test case

Can be run as {{python -m unittest proton_tests.engine.ServerTest.testFailover}} in {{qpid-proton/tests/python}}

{code:title=tests/python/proton_tests/engine.py}
class ServerTest(Test):

  def testFailover(self):
    """ Verify that messages continue to be delivered
    (to the new broker) after failover happens"""
    server1 = common.TestServer2()
    server1.start()
    server2 = common.TestServer2()
    server2.start()
    print("testFailover")

    class Program(MessagingHandler):
      first = True

      def on_start(self, event):
        print("sender: on start")
        # self.conn = event.container.connect(url="%s:%s" % (server1.host, server1.port))  # , allowed_mechs="ANONYMOUS")
        self.conn = event.container.connect(urls=["%s:%s" % (server1.host, server1.port), "%s:%s" % (server2.host, server2.port)])

      def on_sendable(self, event):
        print("sender: on sendable")
        message = Message()
        self.sender.send(message)
        self.connection = event.connection

      def on_connection_opened(self, event):
        self.sender = event.container.create_sender(self.conn, "some_address")
        print("sender: on connection opened")

    p = Program()
    c = Container(p)
    t = Thread(target=c.run)
    t.start()
    sleep(1)
    server1.stop()
    print("server 1 stopped")
    sleep(1)
    server2.stop()
    sleep(1)
    c.stop()
    print("msgs: ", len(server1.tags), len(server2.tags))
    assert len(server1.tags) > 0
    assert len(server2.tags) > 0  # this assert fails, len == 0
{code}

{code:title=tests/python/proton_tests/common.py}
class TestServer2(MessagingHandler):
  """ Base class for creating test-specific message servers.
  """
  def __init__(self, **kwargs):
    super(TestServer2, self).__init__()
    self.args = kwargs
    self.reactor = Container(self)
    self.host = "127.0.0.1"
    self.port = 0
    if "host" in kwargs:
      self.host = kwargs["host"]
    if "port" in kwargs:
      self.port = kwargs["port"]
    self.handlers = [CFlowController(10), CHandshaker()]
    self.thread = Thread(name="server-thread", target=self.run)
    self.thread.daemon = True
    self.running = True
    self.tags = []

  def start(self):
    self.thread.start()

  def stop(self):
    self.running = False
    self.reactor.wakeup()
    self.acceptor.close()
    self.reactor.stop()
    self.thread.join()

  # Note: all following methods all run under the thread:

  def run(self):
    self.reactor.run()

  def on_start(self, event):
      print("on start server")
      retry = 0
      if self.port == 0:
          self.port = str(randint(49152, 65535))
          retry = 10
      while retry > 0:
          try:
              # self.acceptor = self.reactor.acceptor(self.host, self.port)
              self.acceptor = event.container.listen(Url("%s:%s" % (self.host, self.port)))
              break
          except IOError as e:
              print(e)
              self.port = str(randint(49152, 65535))
              retry -= 1
      assert retry > 0, "No free port for server to listen on!"

  def on_delivery(self, event):
    """
    :type event: proton.Event
    """
    print("on delivery")
    event.delivery.settle()
    self.tags.append(event.delivery.tag)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org