You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2020/12/11 18:47:35 UTC
[qpid-proton] branch master updated: PROTON-1914: early settlement
of inbound streamed message. This closes #279
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 72fccd6 PROTON-1914: early settlement of inbound streamed message. This closes #279
new 3c638f9 PROTON-1914: early settlement of inbound streamed message. This closes #279
72fccd6 is described below
commit 72fccd66580dc510540010794e1f05f8a57183f2
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Fri Dec 11 10:29:30 2020 -0800
PROTON-1914: early settlement of inbound streamed message. This closes #279
---
c/src/core/engine-internal.h | 2 +
c/src/core/engine.c | 2 +
c/src/core/transport.c | 78 ++++++++++++++++++++++++++++----------
c/tests/connection_driver_test.cpp | 23 +++++++----
4 files changed, 78 insertions(+), 27 deletions(-)
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 11718c9..832d29d 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -305,6 +305,7 @@ struct pn_link_t {
pn_sequence_t available;
pn_sequence_t credit;
pn_sequence_t queued;
+ pn_sequence_t more_id;
int drained; // number of drained credits
uint8_t snd_settle_mode;
uint8_t rcv_settle_mode;
@@ -313,6 +314,7 @@ struct pn_link_t {
bool drain_flag_mode; // receiver only
bool drain;
bool detached;
+ bool more_pending;
};
struct pn_disposition_t {
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index bfc8613..1aa1992 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1177,6 +1177,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
link->available = 0;
link->credit = 0;
link->queued = 0;
+ link->more_id = 0;
link->drain = false;
link->drain_flag_mode = true;
link->drained = 0;
@@ -1186,6 +1187,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
link->remote_snd_settle_mode = PN_SND_MIXED;
link->remote_rcv_settle_mode = PN_RCV_FIRST;
link->detached = false;
+ link->more_pending = false;
link->properties = 0;
link->remote_properties = 0;
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index fe6ebf1..0467eef 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1519,12 +1519,36 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
if (!link) {
return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
}
- pn_delivery_t *delivery;
- if (link->unsettled_tail && !link->unsettled_tail->done) {
- delivery = link->unsettled_tail;
- if (settled_set && !settled && delivery->remote.settled)
- return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
+ pn_delivery_t *delivery = NULL;
+ bool new_delivery = false;
+ if (link->more_pending) {
+ // Ongoing multiframe delivery.
+ if (link->unsettled_tail && !link->unsettled_tail->done) {
+ delivery = link->unsettled_tail;
+ if (settled_set && !settled && delivery->remote.settled)
+ return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
+ if (id_present && id != delivery->state.id)
+ return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer");
+ } else {
+ // Application has already settled. Delivery is no more.
+ // Ignore content and look for transition to a new delivery.
+ if (!id_present || id == link->more_id) {
+ // Still old delivery.
+ if (!more || aborted)
+ link->more_pending = false;
+ } else {
+ // New id.
+ new_delivery = true;
+ link->more_pending = false;
+ }
+ }
} else {
+ new_delivery = true;
+ }
+
+ if (new_delivery) {
+ assert(!link->more_pending);
+ assert(delivery == NULL);
pn_delivery_map_t *incoming = &ssn->state.incoming;
if (!ssn->state.incoming_init) {
@@ -1550,17 +1574,38 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
link->queued++;
}
- pn_buffer_append(delivery->bytes, payload->start, payload->size);
- ssn->incoming_bytes += payload->size;
- delivery->done = !more;
+ if (delivery) {
+ pn_buffer_append(delivery->bytes, payload->start, payload->size);
+ if (more) {
+ if (!link->more_pending) {
+ // First frame of a multi-frame transfer. Remember at link level.
+ link->more_pending = true;
+ assert(id_present); // Id MUST be set on first frame, and already checked above.
+ link->more_id = id;
+ }
+ delivery->done = false;
+ }
+ else
+ delivery->done = true;
+
+ // XXX: need to fill in remote state: delivery->remote.state = ...;
+ if (settled && !delivery->remote.settled) {
+ delivery->remote.settled = settled;
+ delivery->updated = true;
+ pn_work_update(transport->connection, delivery);
+ }
- // XXX: need to fill in remote state: delivery->remote.state = ...;
- if (settled && !delivery->remote.settled) {
- delivery->remote.settled = settled;
- delivery->updated = true;
- pn_work_update(transport->connection, delivery);
+ if ((delivery->aborted = aborted)) {
+ delivery->remote.settled = true;
+ delivery->done = true;
+ delivery->updated = true;
+ link->more_pending = false;
+ pn_work_update(transport->connection, delivery);
+ }
+ pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY);
}
+ ssn->incoming_bytes += payload->size;
ssn->state.incoming_transfer_count++;
ssn->state.incoming_window--;
@@ -1569,13 +1614,6 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
pni_post_flow(transport, ssn, link);
}
- if ((delivery->aborted = aborted)) {
- delivery->remote.settled = true;
- delivery->done = true;
- delivery->updated = true;
- pn_work_update(transport->connection, delivery);
- }
- pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY);
return 0;
}
diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp
index f8e3345..8a86db5 100644
--- a/c/tests/connection_driver_test.cpp
+++ b/c/tests/connection_driver_test.cpp
@@ -1,6 +1,6 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- h * or more contributor license agreements. See the NOTICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
@@ -525,10 +525,9 @@ TEST_CASE("driver_duplicate_link_client", "[!hide][!shouldfail]") {
cond_empty());
}
-/* Settling an incomplete delivery should not cause an error
- This test will fail till PROTON-1914 is fixed
+/* Settling an incomplete delivery should not cause an error.
*/
-TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") {
+TEST_CASE("driver_settle_incomplete_receiver") {
send_client_handler client;
delivery_handler server;
pn_test::driver_pair d(client, server);
@@ -544,24 +543,34 @@ TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") {
/* Send/receive a frame */
CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
+ server.log_clear();
d.run();
CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear()));
CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data)));
d.run();
- /* Settle the receiver's delivery */
+ /* Settle early while the sender is still sending */
pn_delivery_settle(pn_link_current(rcv));
+ CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
d.run();
CHECK_THAT(*pn_connection_remote_condition(d.client.connection),
cond_empty());
CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty());
- /* Send/receive a frame, should not cause error */
+ pn_delivery_settle(pn_link_current(snd));
+
+ /* Send/receive a new message, should not cause error */
+ pn_link_flow(rcv, 1);
+ d.run();
+ pn_delivery(snd, pn_bytes("2")); /* Prepare to send */
CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
+ server.log_clear();
d.run();
CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear()));
CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data)));
- d.run();
+ pn_delivery_tag_t tag = pn_delivery_tag(pn_link_current(rcv));
+ CHECK(tag.size == 1);
+ CHECK(tag.start[0] == '2');
CHECK_THAT(*pn_connection_remote_condition(d.client.connection),
cond_empty());
CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org