You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/07/08 22:45:47 UTC
svn commit: r1689957 - in /qpid/dispatch/trunk:
include/qpid/dispatch/container.h src/container.c src/router_node.c
Author: tross
Date: Wed Jul 8 20:45:46 2015
New Revision: 1689957
URL: http://svn.apache.org/r1689957
Log:
DISPATCH-149 - Improve link-loss handling in the container.
Remove the default clause in the event switch.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/router_node.c
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1689957&r1=1689956&r2=1689957&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Wed Jul 8 20:45:46 2015
@@ -60,12 +60,19 @@ typedef enum {
} qd_direction_t;
+typedef enum {
+ QD_DETACHED, // Protocol detach
+ QD_CLOSED, // Protocol close
+ QD_LOST // Connection or session closed
+} qd_detach_type_t;
+
+
typedef struct qd_node_t qd_node_t;
typedef struct qd_link_t qd_link_t;
typedef void (*qd_container_delivery_handler_t) (void *node_context, qd_link_t *link, pn_delivery_t *delivery);
typedef int (*qd_container_link_handler_t) (void *node_context, qd_link_t *link);
-typedef int (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, int closed);
+typedef int (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, qd_detach_type_t dt);
typedef void (*qd_container_node_handler_t) (void *type_context, qd_node_t *node);
typedef void (*qd_container_conn_handler_t) (void *type_context, qd_connection_t *conn, void *context);
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1689957&r1=1689956&r2=1689957&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Wed Jul 8 20:45:46 2015
@@ -101,8 +101,9 @@ static void setup_outgoing_link(qd_conta
if (container->default_node)
node = container->default_node;
else {
- // Reject the link
- // TODO - When the API allows, add an error message for "no available node"
+ pn_condition_t *cond = pn_link_condition(pn_link);
+ pn_condition_set_name(cond, "amqp:not-found");
+ pn_condition_set_description(cond, "Source node does not exist");
pn_link_close(pn_link);
pn_link_free(pn_link);
return;
@@ -111,6 +112,9 @@ static void setup_outgoing_link(qd_conta
qd_link_t *link = new_qd_link_t();
if (!link) {
+ pn_condition_t *cond = pn_link_condition(pn_link);
+ pn_condition_set_name(cond, "amqp:internal-error");
+ pn_condition_set_description(cond, "Insufficient memory");
pn_link_close(pn_link);
pn_link_free(pn_link);
return;
@@ -134,7 +138,6 @@ static void setup_incoming_link(qd_conta
qd_node_t *node = 0;
const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link));
qd_field_iterator_t *iter;
- // TODO - Extract the name from the structured target
if (target) {
iter = qd_address_iterator_string(target, ITER_VIEW_NODE_ID);
@@ -147,8 +150,9 @@ static void setup_incoming_link(qd_conta
if (container->default_node)
node = container->default_node;
else {
- // Reject the link
- // TODO - When the API allows, add an error message for "no available node"
+ pn_condition_t *cond = pn_link_condition(pn_link);
+ pn_condition_set_name(cond, "amqp:not-found");
+ pn_condition_set_description(cond, "Target node does not exist");
pn_link_close(pn_link);
pn_link_free(pn_link);
return;
@@ -157,6 +161,9 @@ static void setup_incoming_link(qd_conta
qd_link_t *link = new_qd_link_t();
if (!link) {
+ pn_condition_t *cond = pn_link_condition(pn_link);
+ pn_condition_set_name(cond, "amqp:internal-error");
+ pn_condition_set_description(cond, "Insufficient memory");
pn_link_close(pn_link);
pn_link_free(pn_link);
return;
@@ -239,20 +246,21 @@ static int close_handler(void* unused, p
qd_connection_manager_connection_closed(qd_conn);
//
- // Close all links, passing False as the 'closed' argument. These links are not
+ // Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
//
pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE);
+ pn_link_t *link_to_free;
while (pn_link) {
qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
if (link) {
qd_node_t *node = link->node;
- if (node)
- node->ntype->link_detach_handler(node->context, link, 0);
- link->pn_link = 0;
+ if (node) {
+ node->ntype->link_detach_handler(node->context, link, QD_LOST);
+ }
}
pn_link_close(pn_link);
- pn_link_t *link_to_free = pn_link;
+ link_to_free = pn_link;
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE);
pn_link_free(link_to_free);
}
@@ -285,11 +293,11 @@ static int writable_handler(void* unused
// deliveries are a subset of the traffic that flows both directions on links.
//
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) {
- pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ pn_link = pn_link_head(conn, 0);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
event_count += do_writable(pn_link);
- pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
+ pn_link = pn_link_next(pn_link, 0);
}
}
return event_count;
@@ -342,11 +350,7 @@ int pn_event_handler(void *handler_conte
"Aborting link '%s' due to parent session end",
pn_link_name(pn_link));
qd_link->node->ntype->link_detach_handler(qd_link->node->context,
- qd_link, 1); // assume
- // closed?
- qd_link->pn_link = 0;
- pn_link_close(pn_link);
- pn_link_free(pn_link);
+ qd_link, QD_LOST);
}
}
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
@@ -368,16 +372,17 @@ int pn_event_handler(void *handler_conte
break;
case PN_LINK_REMOTE_CLOSE :
+ case PN_LINK_REMOTE_DETACH :
pn_link = pn_event_link(event);
- if (pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
- qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ if (qd_link) {
qd_node_t *node = qd_link->node;
+ qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (node)
- node->ntype->link_detach_handler(node->context, qd_link, 1); // TODO - get 'closed' from detach message
- qd_link->pn_link = 0;
- pn_link_close(pn_link);
- pn_link_free(pn_link);
+ node->ntype->link_detach_handler(node->context, qd_link, dt);
}
+ pn_link_close(pn_link);
+ pn_link_free(pn_link);
break;
case PN_LINK_FINAL :
@@ -403,7 +408,37 @@ int pn_event_handler(void *handler_conte
}
break;
- default :
+ case PN_EVENT_NONE :
+ case PN_REACTOR_INIT :
+ case PN_REACTOR_QUIESCED :
+ case PN_REACTOR_FINAL :
+ case PN_TIMER_TASK :
+ case PN_CONNECTION_INIT :
+ case PN_CONNECTION_BOUND :
+ case PN_CONNECTION_UNBOUND :
+ case PN_CONNECTION_LOCAL_OPEN :
+ case PN_CONNECTION_LOCAL_CLOSE :
+ case PN_CONNECTION_FINAL :
+ case PN_SESSION_INIT :
+ case PN_SESSION_LOCAL_OPEN :
+ case PN_SESSION_LOCAL_CLOSE :
+ case PN_SESSION_FINAL :
+ case PN_LINK_INIT :
+ case PN_LINK_LOCAL_OPEN :
+ case PN_LINK_LOCAL_CLOSE :
+ case PN_LINK_LOCAL_DETACH :
+ case PN_TRANSPORT :
+ case PN_TRANSPORT_ERROR :
+ case PN_TRANSPORT_HEAD_CLOSED :
+ case PN_TRANSPORT_TAIL_CLOSED :
+ case PN_TRANSPORT_CLOSED :
+ case PN_SELECTABLE_INIT :
+ case PN_SELECTABLE_UPDATED :
+ case PN_SELECTABLE_READABLE :
+ case PN_SELECTABLE_WRITABLE :
+ case PN_SELECTABLE_ERROR :
+ case PN_SELECTABLE_EXPIRED :
+ case PN_SELECTABLE_FINAL :
break;
}
@@ -764,15 +799,15 @@ pn_terminus_t *qd_link_remote_target(qd_
void qd_link_activate(qd_link_t *link)
{
- if (!link || !link->pn_link || pn_link_state(link->pn_link) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE))
+ if (!link || !link->pn_link)
return;
pn_session_t *sess = pn_link_session(link->pn_link);
- if (!sess || pn_session_state(sess) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE))
+ if (!sess)
return;
pn_connection_t *conn = pn_session_connection(sess);
- if (!conn || pn_connection_state(conn) != (PN_LOCAL_ACTIVE|PN_REMOTE_ACTIVE))
+ if (!conn)
return;
qd_connection_t *ctx = pn_connection_get_context(conn);
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1689957&r1=1689956&r2=1689957&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Wed Jul 8 20:45:46 2015
@@ -1536,7 +1536,7 @@ static int router_link_flow_handler(void
/**
* Link Detached Handler
*/
-static int router_link_detach_handler(void* context, qd_link_t *link, int closed)
+static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_type_t dt)
{
qd_router_t *router = (qd_router_t*) context;
qd_router_link_t *rlink = (qd_router_link_t*) qd_link_get_context(link);
@@ -1570,7 +1570,7 @@ static int router_link_detach_handler(vo
ld->condition_info = pn_data(0);
pn_data_copy(ld->condition_info, pn_condition_info(cond));
}
- } else if (!closed) {
+ } else if (dt == QD_LOST) {
strcpy(ld->condition_name, "qd:routed-link-lost");
strcpy(ld->condition_description, "Connectivity to the peer container was lost");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org