You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/06/11 11:41:15 UTC
svn commit: r1491719 - in /qpid/proton/trunk:
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/dispatcher/
proton-c/src/engine/ proton-c/src/messenger/ proton-j/prot...
Author: rhs
Date: Tue Jun 11 09:41:15 2013
New Revision: 1491719
URL: http://svn.apache.org/r1491719
Log:
PROTON-331: fixed logic for session window, removed fix limit, added tests
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/tests/python/proton_tests/engine.py
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Tue Jun 11 09:41:15 2013
@@ -250,4 +250,11 @@ public class JNIDelivery implements Deli
}
super.finalize();
}
+
+ @ProtonCEquivalent("pn_delivery_pending")
+ public int pending()
+ {
+ return (int) Proton.pn_delivery_pending(_impl);
+ }
+
}
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Jun 11 09:41:15 2013
@@ -1993,6 +1993,22 @@ class Session(Endpoint):
def _get_remote_cond_impl(self):
return pn_session_remote_condition(self._ssn)
+ def _get_incoming_capacity(self):
+ return pn_session_get_incoming_capacity(self._ssn)
+
+ def _set_incoming_capacity(self, capacity):
+ pn_session_set_incoming_capacity(self._ssn, capacity)
+
+ incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
+
+ @property
+ def outgoing_bytes(self):
+ return pn_session_outgoing_bytes(self._ssn)
+
+ @property
+ def incoming_bytes(self):
+ return pn_session_incoming_bytes(self._ssn)
+
def open(self):
pn_session_open(self._ssn)
@@ -2264,6 +2280,14 @@ class Delivery(object):
pn_delivery_update(self._dlv, state)
@property
+ def pending(self):
+ return pn_delivery_pending(self._dlv)
+
+ @property
+ def partial(self):
+ return pn_delivery_partial(self._dlv)
+
+ @property
def local_state(self):
return pn_delivery_local_state(self._dlv)
@@ -2672,7 +2696,6 @@ class Driver(object):
__all__ = [
"API_LANGUAGE",
"IMPLEMENTATION_LANGUAGE",
- "PN_SESSION_WINDOW",
"ACCEPTED",
"AUTOMATIC",
"PENDING",
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Jun 11 09:41:15 2013
@@ -111,8 +111,6 @@ typedef int pn_trace_t;
#define PN_TRACE_FRM (2)
#define PN_TRACE_DRV (4)
-#define PN_SESSION_WINDOW (1024)
-
// connection
/** Factory to construct a new Connection.
@@ -182,6 +180,12 @@ PN_EXTERN pn_delivery_t *pn_work_next(pn
*/
PN_EXTERN pn_session_t *pn_session(pn_connection_t *connection);
+PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *ssn);
+PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity);
+
+PN_EXTERN size_t pn_session_outgoing_bytes(pn_session_t *ssn);
+PN_EXTERN size_t pn_session_incoming_bytes(pn_session_t *ssn);
+
/** Factory for creating a transport.
*
* A transport to be used by a connection to interface with the
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Tue Jun 11 09:41:15 2013
@@ -266,9 +266,11 @@ int pn_post_transfer_frame(pn_dispatcher
const pn_bytes_t *tag,
uint32_t message_format,
bool settled,
- bool more)
+ bool more,
+ pn_sequence_t frame_limit)
{
bool more_flag = more;
+ int framecount = 0;
// create preformatives, assuming 'more' flag need not change
@@ -323,7 +325,7 @@ int pn_post_transfer_frame(pn_dispatcher
goto encode_performatives;
}
- pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, disp->output_size);
+ pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, available);
memmove( buf.start + buf.size, disp->output_payload, available);
disp->output_payload += available;
@@ -342,14 +344,15 @@ int pn_post_transfer_frame(pn_dispatcher
disp->output = (char *) realloc(disp->output, disp->capacity);
}
disp->output_frames_ct += 1;
+ framecount++;
if (disp->trace & PN_TRACE_RAW) {
fprintf(stderr, "RAW: \"");
pn_fprint_data(stderr, disp->output + disp->available, n);
fprintf(stderr, "\"\n");
}
disp->available += n;
- } while (disp->output_size > 0);
+ } while (disp->output_size > 0 && framecount < frame_limit);
disp->output_payload = NULL;
- return 0;
+ return framecount;
}
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Tue Jun 11 09:41:15 2013
@@ -80,5 +80,6 @@ int pn_post_transfer_frame(pn_dispatcher
const pn_bytes_t *delivery_tag,
uint32_t message_format,
bool settled,
- bool more);
+ bool more,
+ pn_sequence_t frame_limit);
#endif /* dispatcher.h */
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jun 11 09:41:15 2013
@@ -62,7 +62,6 @@ typedef struct {
} pn_delivery_state_t;
typedef struct {
- size_t capacity;
pn_sequence_t next;
pn_hash_t *deliveries;
} pn_delivery_map_t;
@@ -84,6 +83,7 @@ typedef struct {
pn_delivery_map_t outgoing;
pn_sequence_t incoming_transfer_count;
pn_sequence_t incoming_window;
+ pn_sequence_t remote_incoming_window;
pn_sequence_t outgoing_transfer_count;
pn_sequence_t outgoing_window;
pn_hash_t *local_handles;
@@ -194,6 +194,11 @@ struct pn_session_t {
pn_connection_t *connection;
pn_list_t *links;
void *context;
+ size_t incoming_capacity;
+ pn_sequence_t incoming_bytes;
+ pn_sequence_t outgoing_bytes;
+ pn_sequence_t incoming_deliveries;
+ pn_sequence_t outgoing_deliveries;
pn_session_state_t state;
};
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jun 11 09:41:15 2013
@@ -37,10 +37,9 @@ static ssize_t transport_consume(pn_tran
// delivery buffers
-void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next, size_t capacity)
+void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
{
- db->deliveries = pn_hash(capacity, 0.75, PN_REFCOUNT);
- db->capacity = capacity;
+ db->deliveries = pn_hash(1024, 0.75, PN_REFCOUNT);
db->next = next;
}
@@ -49,11 +48,6 @@ void pn_delivery_map_free(pn_delivery_ma
pn_free(db->deliveries);
}
-size_t pn_delivery_map_available(pn_delivery_map_t *db)
-{
- return db->capacity - pn_hash_size(db->deliveries);
-}
-
pn_delivery_t *pn_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id)
{
return (pn_delivery_t *) pn_hash_get(db->deliveries, id);
@@ -68,8 +62,6 @@ static void pn_delivery_state_init(pn_de
pn_delivery_state_t *pn_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t *delivery)
{
- if (!pn_delivery_map_available(db))
- return NULL;
pn_delivery_state_t *ds = &delivery->state;
pn_delivery_state_init(ds, delivery, db->next++);
pn_hash_put(db->deliveries, ds->id, delivery);
@@ -628,13 +620,18 @@ pn_session_t *pn_session(pn_connection_t
pn_decref(ssn);
ssn->links = pn_list(0, PN_REFCOUNT);
ssn->context = 0;
+ ssn->incoming_capacity = 1024*1024;
+ ssn->incoming_bytes = 0;
+ ssn->outgoing_bytes = 0;
+ ssn->incoming_deliveries = 0;
+ ssn->outgoing_deliveries = 0;
// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
ssn->state.local_channel = (uint16_t)-1;
ssn->state.remote_channel = (uint16_t)-1;
- pn_delivery_map_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
- pn_delivery_map_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
+ pn_delivery_map_init(&ssn->state.incoming, 0);
+ pn_delivery_map_init(&ssn->state.outgoing, 0);
ssn->state.local_handles = pn_hash(0, 0.75, PN_REFCOUNT);
ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT);
// end transport state
@@ -642,6 +639,55 @@ pn_session_t *pn_session(pn_connection_t
return ssn;
}
+size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->incoming_capacity;
+}
+
+void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
+{
+ assert(ssn);
+ // XXX: should this trigger a flow?
+ ssn->incoming_capacity = capacity;
+}
+
+size_t pn_session_outgoing_bytes(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->outgoing_bytes;
+}
+
+size_t pn_session_incoming_bytes(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->incoming_bytes;
+}
+
+size_t pn_session_outgoing_window(pn_session_t *ssn)
+{
+ uint32_t size = ssn->connection->transport->remote_max_frame;
+ if (!size) {
+ return ssn->outgoing_deliveries;
+ } else {
+ pn_sequence_t frames = ssn->outgoing_bytes/size;
+ if (ssn->outgoing_bytes % size) {
+ frames++;
+ }
+ return pn_max(frames, ssn->outgoing_deliveries);
+ }
+}
+
+size_t pn_session_incoming_window(pn_session_t *ssn)
+{
+ uint32_t size = ssn->connection->transport->local_max_frame;
+ if (!size) {
+ return 2147483647; // biggest legal value
+ } else {
+ return (ssn->incoming_capacity - ssn->incoming_bytes)/size;
+ }
+}
+
pn_state_t pn_session_state(pn_session_t *session)
{
return session->endpoint.state;
@@ -1228,6 +1274,7 @@ void pn_advance_sender(pn_link_t *link)
link->current->done = true;
link->queued++;
link->credit--;
+ link->session->outgoing_deliveries++;
pn_add_tpwork(link->current);
link->current = link->current->unsettled_next;
}
@@ -1236,6 +1283,16 @@ void pn_advance_receiver(pn_link_t *link
{
link->credit--;
link->queued--;
+ link->session->incoming_deliveries--;
+
+ pn_delivery_t *current = link->current;
+ link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+ pn_buffer_clear(current->bytes);
+
+ if (!link->session->state.incoming_window) {
+ pn_add_tpwork(current);
+ }
+
link->current = link->current->unsettled_next;
}
@@ -1553,6 +1610,11 @@ int pn_do_transfer(pn_dispatcher_t *disp
&more);
if (err) return err;
pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+
+ if (!ssn->state.incoming_window) {
+ return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
+ }
+
pn_link_t *link = pn_handle_state(ssn, handle);
pn_delivery_t *delivery;
if (link->unsettled_tail && !link->unsettled_tail->done) {
@@ -1560,13 +1622,10 @@ int pn_do_transfer(pn_dispatcher_t *disp
} else {
pn_delivery_map_t *incoming = &ssn->state.incoming;
- if (!pn_delivery_map_available(incoming)) {
- return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
- }
-
if (!ssn->state.incoming_init) {
incoming->next = id;
ssn->state.incoming_init = true;
+ ssn->incoming_deliveries++;
}
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
@@ -1586,11 +1645,13 @@ int pn_do_transfer(pn_dispatcher_t *disp
}
pn_buffer_append(delivery->bytes, disp->payload, disp->size);
+ ssn->incoming_bytes += disp->size;
delivery->done = !more;
ssn->state.incoming_transfer_count++;
ssn->state.incoming_window--;
+ // XXX: need better policy for when to refresh window
if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) {
pn_post_flow(transport, ssn, link);
}
@@ -1613,9 +1674,9 @@ int pn_do_flow(pn_dispatcher_t *disp)
pn_session_t *ssn = pn_channel_state(transport, disp->channel);
if (inext_init) {
- ssn->state.outgoing_window = inext + iwin - ssn->state.outgoing_transfer_count;
+ ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count;
} else {
- ssn->state.outgoing_window = iwin;
+ ssn->state.remote_incoming_window = iwin;
}
if (handle_init) {
@@ -1983,11 +2044,13 @@ int pn_process_ssn_setup(pn_transport_t
if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
uint16_t channel = allocate_alias(transport->local_channels);
+ state->incoming_window = pn_session_incoming_window(ssn);
+ state->outgoing_window = pn_session_outgoing_window(ssn);
pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
((int16_t) state->remote_channel >= 0), state->remote_channel,
state->outgoing_transfer_count,
- pn_delivery_map_available(&state->incoming),
- pn_delivery_map_available(&state->outgoing));
+ state->incoming_window,
+ state->outgoing_window);
state->local_channel = channel;
pn_hash_put(transport->local_channels, channel, ssn);
}
@@ -2058,14 +2121,15 @@ int pn_process_link_setup(pn_transport_t
int pn_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link)
{
- ssn->state.incoming_window = pn_delivery_map_available(&ssn->state.incoming);
+ ssn->state.incoming_window = pn_session_incoming_window(ssn);
+ ssn->state.outgoing_window = pn_session_outgoing_window(ssn);
bool linkq = (bool) link;
pn_link_state_t *state = &link->state;
return pn_post_frame(transport->disp, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW,
(int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count,
ssn->state.incoming_window,
- ssn->state.outgoing.next,
- pn_delivery_map_available(&ssn->state.outgoing),
+ ssn->state.outgoing_transfer_count,
+ ssn->state.outgoing_window,
linkq, linkq ? state->local_handle : 0,
linkq, linkq ? state->delivery_count : 0,
linkq, linkq ? state->link_credit : 0,
@@ -2171,33 +2235,40 @@ int pn_process_tpwork_sender(pn_transpor
pn_link_state_t *link_state = &link->state;
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
- if (!(*allocation_blocked) && !state && pn_delivery_map_available(&ssn_state->outgoing)) {
+ if (!(*allocation_blocked) && !state) {
state = pn_delivery_map_push(&ssn_state->outgoing, delivery);
} else {
+ // XXX: I'm pretty sure this can never actually happen, however
+ // we may need some logic to block allocation if a delivery is
+ // blocked by link credit
*allocation_blocked = true;
}
if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
- ssn_state->outgoing_window > 0 && link_state->link_credit > 0) {
+ ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) {
pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
pn_set_payload(transport->disp, bytes.start, bytes.size);
- pn_buffer_clear(delivery->bytes);
+ link->session->outgoing_bytes -= bytes.size;
pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
- int err = pn_post_transfer_frame(transport->disp,
- ssn_state->local_channel,
- link_state->local_handle,
- state->id, &tag,
- 0, // message-format
- delivery->local_settled,
- !delivery->done);
- if (err) return err;
- ssn_state->outgoing_transfer_count++;
- ssn_state->outgoing_window--;
- if (delivery->done) {
+ int count = pn_post_transfer_frame(transport->disp,
+ ssn_state->local_channel,
+ link_state->local_handle,
+ state->id, &tag,
+ 0, // message-format
+ delivery->local_settled,
+ !delivery->done,
+ ssn_state->remote_incoming_window);
+ if (count < 0) return count;
+ ssn_state->outgoing_transfer_count += count;
+ ssn_state->remote_incoming_window -= count;
+
+ pn_buffer_trim(delivery->bytes, bytes.size - transport->disp->output_size, 0);
+ if (!pn_buffer_size(delivery->bytes) && delivery->done) {
state->sent = true;
link_state->delivery_count++;
link_state->link_credit--;
link->queued--;
+ link->session->outgoing_deliveries--;
}
}
}
@@ -2228,13 +2299,13 @@ int pn_process_tpwork_receiver(pn_transp
}
if (delivery->local_settled) {
- size_t available = pn_delivery_map_available(&ssn->state.incoming);
pn_full_settle(&ssn->state.incoming, delivery);
- if (!ssn->state.incoming_window &&
- pn_delivery_map_available(&ssn->state.incoming) > available) {
- int err = pn_post_flow(transport, ssn, NULL);
- if (err) return err;
- }
+ }
+
+ // XXX: need to centralize this policy and improve it
+ if (!ssn->state.incoming_window) {
+ int err = pn_post_flow(transport, ssn, link);
+ if (err) return err;
}
return 0;
@@ -2621,6 +2692,7 @@ ssize_t pn_link_send(pn_link_t *sender,
pn_delivery_t *current = pn_link_current(sender);
if (!current) return PN_EOS;
pn_buffer_append(current->bytes, bytes, n);
+ sender->session->outgoing_bytes += n;
pn_add_tpwork(current);
return n;
}
@@ -2643,6 +2715,10 @@ ssize_t pn_link_recv(pn_link_t *receiver
size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
pn_buffer_trim(delivery->bytes, size, 0);
if (size) {
+ receiver->session->incoming_bytes -= size;
+ if (!receiver->session->state.incoming_window) {
+ pn_add_tpwork(delivery);
+ }
return size;
} else {
return delivery->done ? PN_EOS : 0;
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Jun 11 09:41:15 2013
@@ -446,7 +446,9 @@ int pni_pump_in(pn_messenger_t *messenge
char *encoded = pn_buffer_bytes(buf).start;
ssize_t n = pn_link_recv(receiver, encoded, pending);
if (n != (ssize_t) pending) {
- return pn_error_format(messenger->error, n, "didn't receive pending bytes: %" PN_ZI, n);
+ return pn_error_format(messenger->error, n,
+ "didn't receive pending bytes: %" PN_ZI " %" PN_ZI,
+ n, pending);
}
n = pn_link_recv(receiver, encoded + pending, 1);
pn_link_advance(receiver);
@@ -478,7 +480,10 @@ void pn_messenger_endpoints(pn_messenger
}
pn_delivery_clear(d);
if (pn_delivery_readable(d)) {
- pni_pump_in(messenger, pn_terminus_get_address(pn_link_source(link)), link);
+ int err = pni_pump_in(messenger, pn_terminus_get_address(pn_link_source(link)), link);
+ if (err) {
+ fprintf(stderr, "%s\n", pn_messenger_error(messenger));
+ }
}
d = pn_work_next(d);
}
@@ -947,12 +952,6 @@ int pn_messenger_get_outgoing_window(pn_
int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
{
- if (window >= PN_SESSION_WINDOW) {
- return pn_error_format(messenger->error, PN_ARG_ERR,
- "specified window (%i) exceeds max (%i)",
- window, PN_SESSION_WINDOW);
- }
-
pni_store_set_window(messenger->outgoing, window);
return 0;
}
@@ -964,12 +963,6 @@ int pn_messenger_get_incoming_window(pn_
int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
{
- if (window >= PN_SESSION_WINDOW) {
- return pn_error_format(messenger->error, PN_ARG_ERR,
- "specified window (%i) exceeds max (%i)",
- window, PN_SESSION_WINDOW);
- }
-
pni_store_set_window(messenger->incoming, window);
return 0;
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Jun 11 09:41:15 2013
@@ -99,4 +99,7 @@ public interface Delivery
public boolean isUpdated();
public boolean isPartial();
+
+ public int pending();
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Transport.java Tue Jun 11 09:41:15 2013
@@ -67,7 +67,7 @@ public interface Transport extends Endpo
/** the lower bound for the agreed maximum frame size (in bytes). */
public int MIN_MAX_FRAME_SIZE = 512;
- public int SESSION_WINDOW = 1024;
+ public int SESSION_WINDOW = 16*1024;
public int END_OF_STREAM = -1;
public void bind(Connection connection);
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Jun 11 09:41:15 2013
@@ -56,8 +56,6 @@ class Constant(object):
class Skipped(Exception):
skipped = True
-PN_SESSION_WINDOW = JTransport.SESSION_WINDOW
-
PENDING = "PENDING"
ACCEPTED = "ACCEPTED"
REJECTED = "REJECTED"
@@ -508,6 +506,10 @@ class Delivery(object):
def work_next(self):
return wrap_delivery(self.impl.getWorkNext())
+ @property
+ def pending(self):
+ return self.impl.pending()
+
class Transport(object):
TRACE_OFF = 0
@@ -1461,7 +1463,6 @@ __all__ = [
"MANUAL",
"PENDING",
"REJECTED",
- "PN_SESSION_WINDOW",
"char",
"Condition",
"Connection",
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Jun 11 09:41:15 2013
@@ -486,4 +486,9 @@ public class DeliveryImpl implements Del
return builder.toString();
}
+ public int pending()
+ {
+ return _dataSize;
+ }
+
}
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Jun 11 09:41:15 2013
@@ -899,7 +899,7 @@ class IdleTimeoutTest(Test):
class CreditTest(Test):
def setup(self):
- self.snd, self.rcv = self.link("test-link")
+ self.snd, self.rcv = self.link("test-link", max_frame=(16*1024, 16*1024))
self.c1 = self.snd.session.connection
self.c2 = self.rcv.session.connection
self.snd.open()
@@ -909,7 +909,7 @@ class CreditTest(Test):
def teardown(self):
self.cleanup()
- def testCreditSender(self):
+ def testCreditSender(self, count=1024):
credit = self.snd.credit
assert credit == 0, credit
self.rcv.flow(10)
@@ -917,10 +917,10 @@ class CreditTest(Test):
credit = self.snd.credit
assert credit == 10, credit
- self.rcv.flow(PN_SESSION_WINDOW)
+ self.rcv.flow(count)
self.pump()
credit = self.snd.credit
- assert credit == 10 + PN_SESSION_WINDOW, credit
+ assert credit == 10 + count, credit
def testCreditReceiver(self):
self.rcv.flow(10)
@@ -939,70 +939,6 @@ class CreditTest(Test):
assert self.rcv.credit == 9, self.rcv.credit
assert self.rcv.queued == 0, self.rcv.queued
- def settle(self):
- result = []
- d = self.c1.work_head
- while d:
- if d.updated:
- result.append(d.tag)
- d.settle()
- d = d.work_next
- return result
-
- def testBuffering(self):
- self.rcv.flow(PN_SESSION_WINDOW + 10)
- self.pump()
-
- assert self.rcv.queued == 0, self.rcv.queued
-
- idx = 0
- while self.snd.credit:
- d = self.snd.delivery("tag%s" % idx)
- assert d
- assert self.snd.advance()
- self.pump()
- idx += 1
-
- assert idx == PN_SESSION_WINDOW + 10, idx
-
- assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
- extra = self.snd.delivery("extra")
- assert extra
- assert self.snd.advance()
- self.pump()
-
- assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
- for i in range(10):
- d = self.rcv.current
- assert d.tag == "tag%s" % i, d.tag
- assert self.rcv.advance()
- d.settle()
- self.pump()
- assert self.rcv.queued == PN_SESSION_WINDOW - (i+1), self.rcv.queued
-
- tags = self.settle()
- assert tags == ["tag%s" % i for i in range(10)], tags
- self.pump()
-
- assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
- for i in range(PN_SESSION_WINDOW):
- d = self.rcv.current
- assert d, i
- assert d.tag == "tag%s" % (i+10), d.tag
- assert self.rcv.advance()
- d.settle()
- self.pump()
-
- assert self.rcv.queued == 0, self.rcv.queued
-
- tags = self.settle()
- assert tags == ["tag%s" % (i+10) for i in range(PN_SESSION_WINDOW)]
-
- assert self.rcv.queued == 0, self.rcv.queued
-
def _testBufferingOnClose(self, a, b):
for i in range(10):
d = self.snd.delivery("tag-%s" % i)
@@ -1053,27 +989,6 @@ class CreditTest(Test):
def testBufferingOnCloseConnectionConnection(self):
self._testBufferingOnClose("connection", "connection")
- def testCreditWithBuffering(self):
- self.rcv.flow(PN_SESSION_WINDOW + 10)
- self.pump()
- assert self.snd.credit == PN_SESSION_WINDOW + 10, self.snd.credit
- assert self.rcv.queued == 0, self.rcv.queued
-
- idx = 0
- while self.snd.credit:
- d = self.snd.delivery("tag%s" % idx)
- assert d
- assert self.snd.advance()
- self.pump()
- idx += 1
-
- assert idx == PN_SESSION_WINDOW + 10, idx
- assert self.rcv.queued == PN_SESSION_WINDOW, self.rcv.queued
-
- self.rcv.flow(1)
- self.pump()
- assert self.snd.credit == 1, self.snd.credit
-
def testFullDrain(self):
assert self.rcv.credit == 0
assert self.snd.credit == 0
@@ -1197,6 +1112,116 @@ class CreditTest(Test):
assert self.rcv.credit == 0
assert self.rcv.queued == 0
+class SessionCreditTest(Test):
+
+ def teardown(self):
+ self.cleanup()
+
+ def testBuffering(self, count=32, size=1024, capacity=16*1024, max_frame=1024):
+ snd, rcv = self.link("test-link", max_frame=(max_frame, max_frame))
+ rcv.session.incoming_capacity = capacity
+ snd.open()
+ rcv.open()
+ rcv.flow(count)
+ self.pump()
+
+ assert count > 0
+
+ total_bytes = count * size
+
+ assert snd.session.outgoing_bytes == 0, snd.session.outgoing_bytes
+ assert rcv.session.incoming_bytes == 0, rcv.session.incoming_bytes
+ assert snd.queued == 0, snd.queued
+ assert rcv.queued == 0, rcv.queued
+
+ idx = 0
+ while snd.credit:
+ d = snd.delivery("tag%s" % idx)
+ assert d
+ n = snd.send("x"*size)
+ assert n == size, (n, size)
+ assert snd.advance()
+ self.pump()
+ idx += 1
+
+ assert idx == count, (idx, count)
+
+ assert snd.session.outgoing_bytes < total_bytes, (snd.session.outgoing_bytes, total_bytes)
+ assert rcv.session.incoming_bytes < capacity, (rcv.session.incoming_bytes, capacity)
+ if snd.session.outgoing_bytes > 0:
+ available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
+ assert available < max_frame, available
+
+ for i in range(count):
+ d = rcv.current
+ pending = d.pending
+ before = rcv.session.incoming_bytes
+ assert rcv.advance()
+ after = rcv.session.incoming_bytes
+ assert before - after == pending
+ snd_before = snd.session.incoming_bytes
+ self.pump()
+ snd_after = snd.session.incoming_bytes
+
+ assert rcv.session.incoming_bytes < capacity
+ if snd_before > 0:
+ assert capacity - after <= max_frame
+ assert snd_before > snd_after
+ if snd_after > 0:
+ available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
+ assert available < max_frame, available
+
+ def testBufferingSize16(self):
+ self.testBuffering(size=16)
+
+ def testBufferingSize256(self):
+ self.testBuffering(size=256)
+
+ def testBufferingSize512(self):
+ self.testBuffering(size=512)
+
+ def testBufferingSize2048(self):
+ self.testBuffering(size=2048)
+
+ def testBufferingSize1025(self):
+ self.testBuffering(size=1025)
+
+ def testBufferingSize1023(self):
+ self.testBuffering(size=1023)
+
+ def testBufferingSize989(self):
+ self.testBuffering(size=989)
+
+ def testBufferingSize1059(self):
+ self.testBuffering(size=1059)
+
+ def testCreditWithBuffering(self):
+ snd, rcv = self.link("test-link", max_frame=(1024, 1024))
+ rcv.session.incoming_capacity = 64*1024
+ snd.open()
+ rcv.open()
+ rcv.flow(128)
+ self.pump()
+
+ assert snd.credit == 128, snd.credit
+ assert rcv.queued == 0, rcv.queued
+
+ idx = 0
+ while snd.credit:
+ d = snd.delivery("tag%s" % idx)
+ snd.send("x"*1024)
+ assert d
+ assert snd.advance()
+ self.pump()
+ idx += 1
+
+ assert idx == 128, idx
+ assert rcv.queued < 128, rcv.queued
+
+ rcv.flow(1)
+ self.pump()
+ assert snd.credit == 1, snd.credit
+
class SettlementTest(Test):
def setup(self):
@@ -1266,6 +1291,53 @@ class SettlementTest(Test):
assert self.snd.unsettled == 1, self.snd.unsettled
assert self.rcv.unsettled == 0, self.rcv.unsettled
+ def testMultipleUnsettled(self, count=1024, size=1024):
+ self.rcv.flow(count)
+ self.pump()
+
+ assert self.snd.unsettled == 0, self.snd.unsettled
+ assert self.rcv.unsettled == 0, self.rcv.unsettled
+
+ unsettled = []
+
+ for i in range(count):
+ sd = self.snd.delivery("tag%s" % i)
+ assert sd
+ n = self.snd.send("x"*size)
+ assert n == size, n
+ assert self.snd.advance()
+ self.pump()
+
+ rd = self.rcv.current
+ assert rd, "did not receive delivery %s" % i
+ n = rd.pending
+ b = self.rcv.recv(n)
+ assert len(b) == n, (b, n)
+ rd.update(Delivery.ACCEPTED)
+ assert self.rcv.advance()
+ self.pump()
+ unsettled.append(rd)
+
+ assert self.rcv.unsettled == count
+
+ for rd in unsettled:
+ rd.settle()
+
+ def testMultipleUnsettled2K1K(self):
+ self.testMultipleUnsettled(2048, 1024)
+
+ def testMultipleUnsettled4K1K(self):
+ self.testMultipleUnsettled(4096, 1024)
+
+ def testMultipleUnsettled1K2K(self):
+ self.testMultipleUnsettled(1024, 2048)
+
+ def testMultipleUnsettled2K2K(self):
+ self.testMultipleUnsettled(2048, 2048)
+
+ def testMultipleUnsettled4K2K(self):
+ self.testMultipleUnsettled(4096, 2048)
+
class PipelineTest(Test):
def setup(self):
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1491719&r1=1491718&r2=1491719&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Jun 11 09:41:15 2013
@@ -285,43 +285,45 @@ class MessengerTest(Test):
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
- def testIncomingQueueBiggerThanWindow(self):
+ def testIncomingQueueBiggerThanWindow(self, size=10):
if IMPLEMENTATION_LANGUAGE == "Java":
# Currently fails with proton-j. See https://issues.apache.org/jira/browse/PROTON-315
raise Skipped
- self.server.outgoing_window = 10
- self.client.incoming_window = 10
+ self.server.outgoing_window = size
+ self.client.incoming_window = size
self.start()
msg = Message()
msg.address = "amqp://0.0.0.0:12345"
msg.subject = "Hello World!"
- for i in range(20):
+ for i in range(2*size):
self.client.put(msg)
- while self.client.incoming < 20:
- self.client.recv(20 - self.client.incoming)
-
trackers = []
- while self.client.incoming:
- t = self.client.get(msg)
- assert self.client.status(t) is PENDING, (t, self.client.status(t))
- trackers.append(t)
+ while len(trackers) < 2*size:
+ self.client.recv(2*size - len(trackers))
+ while self.client.incoming:
+ t = self.client.get(msg)
+ assert self.client.status(t) is PENDING, (t, self.client.status(t))
+ trackers.append(t)
- for t in trackers[:10]:
+ for t in trackers[:size]:
assert self.client.status(t) is None, (t, self.client.status(t))
- for t in trackers[10:]:
+ for t in trackers[size:]:
assert self.client.status(t) is PENDING, (t, self.client.status(t))
self.client.accept()
- for t in trackers[:10]:
+ for t in trackers[:size]:
assert self.client.status(t) is None, (t, self.client.status(t))
- for t in trackers[10:]:
+ for t in trackers[size:]:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+ def testIncomingQueueBiggerThanSessionWindow(self):
+ self.testIncomingQueueBiggerThanWindow(2048)
+
def test_proton222(self):
self.start()
msg = Message()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org