You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/11/02 16:05:07 UTC
svn commit: r1404992 - in /qpid/proton/trunk/proton-c/src:
engine/engine-internal.h engine/engine.c util.h
Author: rhs
Date: Fri Nov 2 15:05:07 2012
New Revision: 1404992
URL: http://svn.apache.org/viewvc?rev=1404992&view=rev
Log:
moved resize macros to util.h; fixed bug in unsettled list; added simple batching of dispositions
Modified:
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/util.h
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Fri Nov 2 15:05:07 2012
@@ -82,6 +82,12 @@ typedef struct {
size_t link_capacity;
pn_link_state_t **handles;
size_t handle_capacity;
+
+ uint64_t disp_code;
+ bool disp_settled;
+ bool disp_type;
+ pn_sequence_t disp_first;
+ pn_sequence_t disp_last;
} pn_session_state_t;
#define SCRATCH (1024)
@@ -216,20 +222,6 @@ struct pn_delivery_t {
void pn_link_dump(pn_link_t *link);
-#define PN_ENSURE(ARRAY, CAPACITY, COUNT) \
- while ((CAPACITY) < (COUNT)) { \
- (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16; \
- (ARRAY) = realloc((ARRAY), (CAPACITY) * sizeof (*(ARRAY))); \
- } \
-
-#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT) \
- { \
- size_t _old_capacity = (CAPACITY); \
- PN_ENSURE((ARRAY), (CAPACITY), (COUNT)); \
- memset((ARRAY) + _old_capacity, 0, \
- sizeof(*(ARRAY))*((CAPACITY) - _old_capacity)); \
- }
-
void pn_dump(pn_connection_t *conn);
void pn_transport_sasl_init(pn_transport_t *transport);
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Fri Nov 2 15:05:07 2012
@@ -1151,12 +1151,20 @@ int pn_link_unsettled(pn_link_t *link)
pn_delivery_t *pn_unsettled_head(pn_link_t *link)
{
- return link->unsettled_head;
+ pn_delivery_t *d = link->unsettled_head;
+ while (d && d->local_settled) {
+ d = d->unsettled_next;
+ }
+ return d;
}
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
{
- return delivery->unsettled_next;
+ pn_delivery_t *d = delivery->unsettled_next;
+ while (d && d->local_settled) {
+ d = d->unsettled_next;
+ }
+ return d;
}
bool pn_is_current(pn_delivery_t *delivery)
@@ -1982,10 +1990,29 @@ int pn_process_flow_receiver(pn_transpor
return 0;
}
+int pn_flush_disp(pn_transport_t *transport, pn_session_state_t *ssn_state)
+{
+ uint64_t code = ssn_state->disp_code;
+ uint64_t settled = ssn_state->disp_settled;
+ if (code || settled) {
+ int err = pn_post_frame(transport->disp, ssn_state->local_channel, "DL[oIIo?DL[]]", DISPOSITION,
+ ssn_state->disp_type, ssn_state->disp_first, ssn_state->disp_last,
+ settled, (bool)code, code);
+ if (err) return err;
+ ssn_state->disp_type = 0;
+ ssn_state->disp_code = 0;
+ ssn_state->disp_settled = 0;
+ ssn_state->disp_first = 0;
+ ssn_state->disp_last = 0;
+ }
+ return 0;
+}
+
int pn_post_disp(pn_transport_t *transport, pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
+ pn_modified(transport->connection, &link->session->endpoint);
// XXX: check for null state
pn_delivery_state_t *state = delivery->transport_context;
uint64_t code;
@@ -1996,16 +2023,34 @@ int pn_post_disp(pn_transport_t *transpo
case PN_RELEASED:
code = RELEASED;
break;
+ case PN_REJECTED:
+ code = REJECTED;
+ break;
//TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
default:
code = 0;
}
- if (code || delivery->local_settled)
- return pn_post_frame(transport->disp, ssn_state->local_channel, "DL[oIIo?DL[]]", DISPOSITION,
- link->endpoint.type == RECEIVER, state->id, state->id,
- delivery->local_settled,
- (bool)code, code);
+ if (code == ssn_state->disp_code && delivery->local_settled == ssn_state->disp_settled &&
+ ssn_state->disp_type == (link->endpoint.type == RECEIVER)) {
+ if (state->id == ssn_state->disp_first - 1) {
+ ssn_state->disp_first = state->id;
+ return 0;
+ } else if (state->id == ssn_state->disp_last + 1) {
+ ssn_state->disp_last = state->id;
+ return 0;
+ }
+ }
+
+ int err = pn_flush_disp(transport, ssn_state);
+ if (err) return err;
+
+ ssn_state->disp_type = (link->endpoint.type == RECEIVER);
+ ssn_state->disp_code = code;
+ ssn_state->disp_settled = delivery->local_settled;
+ ssn_state->disp_first = state->id;
+ ssn_state->disp_last = state->id;
+
return 0;
}
@@ -2112,6 +2157,21 @@ int pn_process_tpwork(pn_transport_t *tr
return 0;
}
+int pn_process_flush_disp(pn_transport_t *transport, pn_endpoint_t *endpoint)
+{
+ if (endpoint->type == SESSION) {
+ pn_session_t *session = (pn_session_t *) endpoint;
+ pn_session_state_t *state = pn_session_get_state(transport, session);
+ if ((int16_t) state->local_channel >= 0 && !transport->close_sent)
+ {
+ int err = pn_flush_disp(transport, state);
+ if (err) return err;
+ }
+ }
+
+ return 0;
+}
+
int pn_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint)
{
if (endpoint->type == SENDER && endpoint->state & PN_LOCAL_ACTIVE)
@@ -2251,6 +2311,8 @@ int pn_process(pn_transport_t *transport
if ((err = pn_phase(transport, pn_process_tpwork))) return err;
if ((err = pn_phase(transport, pn_process_tpwork))) return err;
+ if ((err = pn_phase(transport, pn_process_flush_disp))) return err;
+
if ((err = pn_phase(transport, pn_process_flow_sender))) return err;
if ((err = pn_phase(transport, pn_process_link_teardown))) return err;
if ((err = pn_phase(transport, pn_process_ssn_teardown))) return err;
Modified: qpid/proton/trunk/proton-c/src/util.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/util.h?rev=1404992&r1=1404991&r2=1404992&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/util.h (original)
+++ qpid/proton/trunk/proton-c/src/util.h Fri Nov 2 15:05:07 2012
@@ -95,4 +95,18 @@ char *pn_strndup(const char *src, size_t
#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
+#define PN_ENSURE(ARRAY, CAPACITY, COUNT) \
+ while ((CAPACITY) < (COUNT)) { \
+ (CAPACITY) = (CAPACITY) ? 2 * (CAPACITY) : 16; \
+ (ARRAY) = realloc((ARRAY), (CAPACITY) * sizeof (*(ARRAY))); \
+ } \
+
+#define PN_ENSUREZ(ARRAY, CAPACITY, COUNT) \
+ { \
+ size_t _old_capacity = (CAPACITY); \
+ PN_ENSURE((ARRAY), (CAPACITY), (COUNT)); \
+ memset((ARRAY) + _old_capacity, 0, \
+ sizeof(*(ARRAY))*((CAPACITY) - _old_capacity)); \
+ }
+
#endif /* util.h */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org