You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2014/09/04 21:24:52 UTC

svn commit: r1622536 - in /qpid/proton/trunk: proton-c/src/dispatcher/dispatcher.c proton-c/src/messenger/messenger.c tests/python/proton_tests/messenger.py

Author: astitcher
Date: Thu Sep  4 19:24:52 2014
New Revision: 1622536

URL: http://svn.apache.org/r1622536
Log:
PROTON-610: Messenger code doesn't send heartbeat frames

Modified:
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Thu Sep  4 19:24:52 2014
@@ -116,6 +116,10 @@ static void pn_do_trace(pn_dispatcher_t 
     pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-");
     pn_inspect(args, disp->scratch);
 
+    if (pn_data_size(args)==0) {
+        pn_string_addf(disp->scratch, "(EMPTY FRAME)");
+    }
+
     if (size) {
       char buf[1024];
       int e = pn_quote_data(buf, 1024, payload, size);

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Sep  4 19:24:52 2014
@@ -31,6 +31,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
+
 #include "../util.h"
 #include "../platform.h"
 #include "../platform_fmt.h"
@@ -1275,8 +1276,37 @@ int pn_messenger_process_events(pn_messe
   return processed;
 }
 
+/**
+ * Function to invoke AMQP related timer events, such as a heartbeat to prevent
+ * remote_idle timeout events
+ */
+static void pni_messenger_tick(pn_messenger_t *messenger)
+{
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+    pn_connection_t *connection =
+        (pn_connection_t *)pn_list_get(messenger->connections, i);
+    pn_transport_t *transport = pn_connection_transport(connection);
+    if (transport) {
+      pn_transport_tick(transport, pn_i_now());
+
+      // if there is pending data, such as an empty heartbeat frame, call
+      // process events. This should kick off the chain of selectables for
+      // reading/writing.
+      ssize_t pending = pn_transport_pending(transport);
+      if (pending > 0) {
+        pn_connection_ctx_t *cctx =
+            (pn_connection_ctx_t *)pn_connection_get_context(connection);
+        pn_messenger_process_events(messenger);
+        pn_messenger_flow(messenger);
+        pni_conn_modified(pni_context(cctx->selectable));
+      }
+    }
+  }
+}
+
 int pn_messenger_process(pn_messenger_t *messenger)
 {
+  bool doMessengerTick = true;
   pn_selectable_t *sel;
   int events;
   while ((sel = pn_selector_next(messenger->selector, &events))) {
@@ -1285,12 +1315,17 @@ int pn_messenger_process(pn_messenger_t 
     }
     if (events & PN_WRITABLE) {
       pn_selectable_writable(sel);
+      doMessengerTick = false;
     }
     if (events & PN_EXPIRED) {
       pn_selectable_expired(sel);
     }
   }
-
+  // ensure timer events are processed. Cannot call this inside the while loop
+  // as the timer events are not seen by the selector
+  if (doMessengerTick) {
+    pni_messenger_tick(messenger);
+  }
   if (messenger->interrupted) {
     messenger->interrupted = false;
     return PN_INTR;

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Sep  4 19:24:52 2014
@@ -1027,3 +1027,59 @@ class SelectableMessengerTest(common.Tes
 
   def testSelectable4096(self):
     self.testSelectable(count=4096)
+
+
+class IdleTimeoutTest(common.Test):
+
+  def testIdleTimeout(self):
+    """
+    Verify that a Messenger connection is kept alive using empty idle frames
+    when a idle_timeout is advertised by the remote peer.
+    """
+    if "java" in sys.platform:
+      raise Skipped()
+    idle_timeout_secs = self.delay
+
+    try:
+      idle_server = common.TestServerDrain(idle_timeout=idle_timeout_secs)
+      idle_server.timeout = self.timeout
+      idle_server.start()
+
+      idle_client = Messenger("idle_client")
+      idle_client.timeout = self.timeout
+      idle_client.start()
+
+      idle_client.subscribe("amqp://%s:%s/foo" %
+                            (idle_server.host, idle_server.port))
+      idle_client.work(idle_timeout_secs/10)
+
+      # wait up to 3x the idle timeout and hence verify that everything stays
+      # connected during that time by virtue of no Exception being raised
+      duration = 3 * idle_timeout_secs
+      deadline = time() + duration
+      while time() <= deadline:
+        idle_client.work(idle_timeout_secs/10)
+        continue
+
+      # confirm link is still active
+      cxtr = idle_server.driver.head_connector()
+      assert not cxtr.closed, "Connector has unexpectedly been closed"
+      conn = cxtr.connection
+      assert conn.state == (Endpoint.LOCAL_ACTIVE
+                            | Endpoint.REMOTE_ACTIVE
+                            ), "Connection has unexpectedly terminated"
+      link = conn.link_head(0)
+      while link:
+        assert link.state != (Endpoint.REMOTE_CLOSED
+                              ), "Link unexpectedly closed"
+        link = link.next(0)
+
+    finally:
+      try:
+        idle_client.stop()
+      except:
+        pass
+      try:
+        idle_server.stop()
+      except:
+        pass



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org