You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2014/09/04 21:24:52 UTC
svn commit: r1622536 - in /qpid/proton/trunk:
proton-c/src/dispatcher/dispatcher.c proton-c/src/messenger/messenger.c
tests/python/proton_tests/messenger.py
Author: astitcher
Date: Thu Sep 4 19:24:52 2014
New Revision: 1622536
URL: http://svn.apache.org/r1622536
Log:
PROTON-610: Messenger code doesn't send heartbeat frames
Modified:
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Thu Sep 4 19:24:52 2014
@@ -116,6 +116,10 @@ static void pn_do_trace(pn_dispatcher_t
pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-");
pn_inspect(args, disp->scratch);
+ if (pn_data_size(args)==0) {
+ pn_string_addf(disp->scratch, "(EMPTY FRAME)");
+ }
+
if (size) {
char buf[1024];
int e = pn_quote_data(buf, 1024, payload, size);
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Sep 4 19:24:52 2014
@@ -31,6 +31,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
+
#include "../util.h"
#include "../platform.h"
#include "../platform_fmt.h"
@@ -1275,8 +1276,37 @@ int pn_messenger_process_events(pn_messe
return processed;
}
+/**
+ * Function to invoke AMQP related timer events, such as a heartbeat to prevent
+ * remote_idle timeout events
+ */
+static void pni_messenger_tick(pn_messenger_t *messenger)
+{
+ for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+ pn_connection_t *connection =
+ (pn_connection_t *)pn_list_get(messenger->connections, i);
+ pn_transport_t *transport = pn_connection_transport(connection);
+ if (transport) {
+ pn_transport_tick(transport, pn_i_now());
+
+ // if there is pending data, such as an empty heartbeat frame, call
+ // process events. This should kick off the chain of selectables for
+ // reading/writing.
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ pn_connection_ctx_t *cctx =
+ (pn_connection_ctx_t *)pn_connection_get_context(connection);
+ pn_messenger_process_events(messenger);
+ pn_messenger_flow(messenger);
+ pni_conn_modified(pni_context(cctx->selectable));
+ }
+ }
+ }
+}
+
int pn_messenger_process(pn_messenger_t *messenger)
{
+ bool doMessengerTick = true;
pn_selectable_t *sel;
int events;
while ((sel = pn_selector_next(messenger->selector, &events))) {
@@ -1285,12 +1315,17 @@ int pn_messenger_process(pn_messenger_t
}
if (events & PN_WRITABLE) {
pn_selectable_writable(sel);
+ doMessengerTick = false;
}
if (events & PN_EXPIRED) {
pn_selectable_expired(sel);
}
}
-
+ // ensure timer events are processed. Cannot call this inside the while loop
+ // as the timer events are not seen by the selector
+ if (doMessengerTick) {
+ pni_messenger_tick(messenger);
+ }
if (messenger->interrupted) {
messenger->interrupted = false;
return PN_INTR;
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1622536&r1=1622535&r2=1622536&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Sep 4 19:24:52 2014
@@ -1027,3 +1027,59 @@ class SelectableMessengerTest(common.Tes
def testSelectable4096(self):
self.testSelectable(count=4096)
+
+
+class IdleTimeoutTest(common.Test):
+
+ def testIdleTimeout(self):
+ """
+ Verify that a Messenger connection is kept alive using empty idle frames
+ when a idle_timeout is advertised by the remote peer.
+ """
+ if "java" in sys.platform:
+ raise Skipped()
+ idle_timeout_secs = self.delay
+
+ try:
+ idle_server = common.TestServerDrain(idle_timeout=idle_timeout_secs)
+ idle_server.timeout = self.timeout
+ idle_server.start()
+
+ idle_client = Messenger("idle_client")
+ idle_client.timeout = self.timeout
+ idle_client.start()
+
+ idle_client.subscribe("amqp://%s:%s/foo" %
+ (idle_server.host, idle_server.port))
+ idle_client.work(idle_timeout_secs/10)
+
+ # wait up to 3x the idle timeout and hence verify that everything stays
+ # connected during that time by virtue of no Exception being raised
+ duration = 3 * idle_timeout_secs
+ deadline = time() + duration
+ while time() <= deadline:
+ idle_client.work(idle_timeout_secs/10)
+ continue
+
+ # confirm link is still active
+ cxtr = idle_server.driver.head_connector()
+ assert not cxtr.closed, "Connector has unexpectedly been closed"
+ conn = cxtr.connection
+ assert conn.state == (Endpoint.LOCAL_ACTIVE
+ | Endpoint.REMOTE_ACTIVE
+ ), "Connection has unexpectedly terminated"
+ link = conn.link_head(0)
+ while link:
+ assert link.state != (Endpoint.REMOTE_CLOSED
+ ), "Link unexpectedly closed"
+ link = link.next(0)
+
+ finally:
+ try:
+ idle_client.stop()
+ except:
+ pass
+ try:
+ idle_server.stop()
+ except:
+ pass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org