You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2020/12/11 18:47:35 UTC

[qpid-proton] branch master updated: PROTON-1914: early settlement of inbound streamed message. This closes #279

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 72fccd6  PROTON-1914: early settlement of inbound streamed message.  This closes #279
     new 3c638f9  PROTON-1914: early settlement of inbound streamed message.  This closes #279
72fccd6 is described below

commit 72fccd66580dc510540010794e1f05f8a57183f2
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Fri Dec 11 10:29:30 2020 -0800

    PROTON-1914: early settlement of inbound streamed message.  This closes #279
---
 c/src/core/engine-internal.h       |  2 +
 c/src/core/engine.c                |  2 +
 c/src/core/transport.c             | 78 ++++++++++++++++++++++++++++----------
 c/tests/connection_driver_test.cpp | 23 +++++++----
 4 files changed, 78 insertions(+), 27 deletions(-)

diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 11718c9..832d29d 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -305,6 +305,7 @@ struct pn_link_t {
   pn_sequence_t available;
   pn_sequence_t credit;
   pn_sequence_t queued;
+  pn_sequence_t more_id;
   int drained; // number of drained credits
   uint8_t snd_settle_mode;
   uint8_t rcv_settle_mode;
@@ -313,6 +314,7 @@ struct pn_link_t {
   bool drain_flag_mode; // receiver only
   bool drain;
   bool detached;
+  bool more_pending;
 };
 
 struct pn_disposition_t {
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index bfc8613..1aa1992 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1177,6 +1177,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
   link->available = 0;
   link->credit = 0;
   link->queued = 0;
+  link->more_id = 0;
   link->drain = false;
   link->drain_flag_mode = true;
   link->drained = 0;
@@ -1186,6 +1187,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
   link->remote_snd_settle_mode = PN_SND_MIXED;
   link->remote_rcv_settle_mode = PN_RCV_FIRST;
   link->detached = false;
+  link->more_pending = false;
   link->properties = 0;
   link->remote_properties = 0;
 
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index fe6ebf1..0467eef 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1519,12 +1519,36 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
   if (!link) {
     return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
   }
-  pn_delivery_t *delivery;
-  if (link->unsettled_tail && !link->unsettled_tail->done) {
-    delivery = link->unsettled_tail;
-    if (settled_set && !settled && delivery->remote.settled)
-      return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
+  pn_delivery_t *delivery = NULL;
+  bool new_delivery = false;
+  if (link->more_pending) {
+    // Ongoing multiframe delivery.
+    if (link->unsettled_tail && !link->unsettled_tail->done) {
+      delivery = link->unsettled_tail;
+      if (settled_set && !settled && delivery->remote.settled)
+        return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
+      if (id_present && id != delivery->state.id)
+        return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer");
+    } else {
+      // Application has already settled.  Delivery is no more.
+      // Ignore content and look for transition to a new delivery.
+      if (!id_present || id == link->more_id) {
+        // Still old delivery.
+        if (!more || aborted)
+          link->more_pending = false;
+      } else {
+        // New id.
+        new_delivery = true;
+        link->more_pending = false;
+      }
+    }
   } else {
+    new_delivery = true;
+  }
+
+  if (new_delivery) {
+    assert(!link->more_pending);
+    assert(delivery == NULL);
     pn_delivery_map_t *incoming = &ssn->state.incoming;
 
     if (!ssn->state.incoming_init) {
@@ -1550,17 +1574,38 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
     link->queued++;
   }
 
-  pn_buffer_append(delivery->bytes, payload->start, payload->size);
-  ssn->incoming_bytes += payload->size;
-  delivery->done = !more;
+  if (delivery) {
+    pn_buffer_append(delivery->bytes, payload->start, payload->size);
+    if (more) {
+      if (!link->more_pending) {
+        // First frame of a multi-frame transfer. Remember at link level.
+        link->more_pending = true;
+        assert(id_present);  // Id MUST be set on first frame, and already checked above.
+        link->more_id = id;
+      }
+      delivery->done = false;
+    }
+    else
+      delivery->done = true;
+
+    // XXX: need to fill in remote state: delivery->remote.state = ...;
+    if (settled && !delivery->remote.settled) {
+      delivery->remote.settled = settled;
+      delivery->updated = true;
+      pn_work_update(transport->connection, delivery);
+    }
 
-  // XXX: need to fill in remote state: delivery->remote.state = ...;
-  if (settled && !delivery->remote.settled) {
-    delivery->remote.settled = settled;
-    delivery->updated = true;
-    pn_work_update(transport->connection, delivery);
+    if ((delivery->aborted = aborted)) {
+      delivery->remote.settled = true;
+      delivery->done = true;
+      delivery->updated = true;
+      link->more_pending = false;
+      pn_work_update(transport->connection, delivery);
+    }
+    pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY);
   }
 
+  ssn->incoming_bytes += payload->size;
   ssn->state.incoming_transfer_count++;
   ssn->state.incoming_window--;
 
@@ -1569,13 +1614,6 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
     pni_post_flow(transport, ssn, link);
   }
 
-  if ((delivery->aborted = aborted)) {
-    delivery->remote.settled = true;
-    delivery->done = true;
-    delivery->updated = true;
-    pn_work_update(transport->connection, delivery);
-  }
-  pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY);
   return 0;
 }
 
diff --git a/c/tests/connection_driver_test.cpp b/c/tests/connection_driver_test.cpp
index f8e3345..8a86db5 100644
--- a/c/tests/connection_driver_test.cpp
+++ b/c/tests/connection_driver_test.cpp
@@ -1,6 +1,6 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- h * or more contributor license agreements.  See the NOTICE file
+ * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
@@ -525,10 +525,9 @@ TEST_CASE("driver_duplicate_link_client", "[!hide][!shouldfail]") {
              cond_empty());
 }
 
-/* Settling an incomplete delivery should not cause an error
-   This test will fail till PROTON-1914 is fixed
+/* Settling an incomplete delivery should not cause an error.
 */
-TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") {
+TEST_CASE("driver_settle_incomplete_receiver") {
   send_client_handler client;
   delivery_handler server;
   pn_test::driver_pair d(client, server);
@@ -544,24 +543,34 @@ TEST_CASE("driver_settle_incomplete_receiver", "[!hide][!shouldfail]") {
 
   /* Send/receive a frame */
   CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
+  server.log_clear();
   d.run();
   CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear()));
   CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data)));
   d.run();
 
-  /* Settle the receiver's delivery */
+  /* Settle early while the sender is still sending */
   pn_delivery_settle(pn_link_current(rcv));
+  CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
   d.run();
   CHECK_THAT(*pn_connection_remote_condition(d.client.connection),
              cond_empty());
   CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty());
 
-  /* Send/receive a frame, should not cause error */
+  pn_delivery_settle(pn_link_current(snd));
+
+  /* Send/receive a new message, should not cause error */
+  pn_link_flow(rcv, 1);
+  d.run();
+  pn_delivery(snd, pn_bytes("2")); /* Prepare to send */
   CHECK(sizeof(data) == pn_link_send(snd, data, sizeof(data)));
+  server.log_clear();
   d.run();
   CHECK_THAT(ETYPES(PN_DELIVERY), Equals(server.log_clear()));
   CHECK(sizeof(data) == pn_link_recv(rcv, rbuf, sizeof(data)));
-  d.run();
+  pn_delivery_tag_t tag = pn_delivery_tag(pn_link_current(rcv));
+  CHECK(tag.size == 1);
+  CHECK(tag.start[0] == '2');
   CHECK_THAT(*pn_connection_remote_condition(d.client.connection),
              cond_empty());
   CHECK_THAT(*pn_connection_condition(d.server.connection), cond_empty());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org